Skip to main content

rustbac_client/
client.rs

1use crate::{
2    AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientBitString,
3    ClientDataValue, ClientError, CovNotification, CovPropertyValue, DiscoveredDevice,
4    DiscoveredObject, EnrollmentSummaryItem, EventInformationItem, EventInformationResult,
5    EventNotification, ReadRangeResult,
6};
7use rustbac_bacnet_sc::BacnetScTransport;
8use rustbac_core::apdu::{
9    AbortPdu, ApduType, BacnetError, ComplexAckHeader, ConfirmedRequestHeader, RejectPdu,
10    SegmentAck, SimpleAck, UnconfirmedRequestHeader,
11};
12use rustbac_core::encoding::{reader::Reader, writer::Writer};
13use rustbac_core::npdu::Npdu;
14use rustbac_core::services::acknowledge_alarm::{
15    AcknowledgeAlarmRequest, SERVICE_ACKNOWLEDGE_ALARM,
16};
17use rustbac_core::services::alarm_summary::{
18    AlarmSummaryItem as CoreAlarmSummaryItem, GetAlarmSummaryAck, GetAlarmSummaryRequest,
19    SERVICE_GET_ALARM_SUMMARY,
20};
21use rustbac_core::services::atomic_read_file::{
22    AtomicReadFileAck, AtomicReadFileAckAccess, AtomicReadFileRequest, SERVICE_ATOMIC_READ_FILE,
23};
24use rustbac_core::services::atomic_write_file::{
25    AtomicWriteFileAck, AtomicWriteFileRequest, SERVICE_ATOMIC_WRITE_FILE,
26};
27use rustbac_core::services::cov_notification::{
28    CovNotificationRequest, SERVICE_CONFIRMED_COV_NOTIFICATION,
29    SERVICE_UNCONFIRMED_COV_NOTIFICATION,
30};
31use rustbac_core::services::device_management::{
32    DeviceCommunicationControlRequest, DeviceCommunicationState, ReinitializeDeviceRequest,
33    ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL, SERVICE_REINITIALIZE_DEVICE,
34};
35use rustbac_core::services::enrollment_summary::{
36    EnrollmentSummaryItem as CoreEnrollmentSummaryItem, GetEnrollmentSummaryAck,
37    GetEnrollmentSummaryRequest, SERVICE_GET_ENROLLMENT_SUMMARY,
38};
39use rustbac_core::services::event_information::{
40    EventSummaryItem as CoreEventSummaryItem, GetEventInformationAck, GetEventInformationRequest,
41    SERVICE_GET_EVENT_INFORMATION,
42};
43use rustbac_core::services::event_notification::{
44    EventNotificationRequest, SERVICE_CONFIRMED_EVENT_NOTIFICATION,
45    SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
46};
47use rustbac_core::services::i_am::{IAmRequest, SERVICE_I_AM};
48use rustbac_core::services::list_element::{
49    AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
50    SERVICE_REMOVE_LIST_ELEMENT,
51};
52use rustbac_core::services::object_management::{
53    CreateObjectAck, CreateObjectRequest, DeleteObjectRequest, SERVICE_CREATE_OBJECT,
54    SERVICE_DELETE_OBJECT,
55};
56use rustbac_core::services::private_transfer::{
57    ConfirmedPrivateTransferAck as PrivateTransferAck, ConfirmedPrivateTransferRequest,
58    SERVICE_CONFIRMED_PRIVATE_TRANSFER,
59};
60use rustbac_core::services::read_property::{
61    ReadPropertyAck, ReadPropertyRequest, SERVICE_READ_PROPERTY,
62};
63use rustbac_core::services::read_property_multiple::{
64    PropertyReference, ReadAccessSpecification, ReadPropertyMultipleAck,
65    ReadPropertyMultipleRequest, SERVICE_READ_PROPERTY_MULTIPLE,
66};
67use rustbac_core::services::read_range::{ReadRangeAck, ReadRangeRequest, SERVICE_READ_RANGE};
68use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
69use rustbac_core::services::subscribe_cov_property::{
70    SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
71};
72use rustbac_core::services::time_synchronization::TimeSynchronizationRequest;
73use rustbac_core::services::who_has::{IHaveRequest, WhoHasObject, WhoHasRequest, SERVICE_I_HAVE};
74use rustbac_core::services::who_is::WhoIsRequest;
75use rustbac_core::services::write_property::{WritePropertyRequest, SERVICE_WRITE_PROPERTY};
76use rustbac_core::services::write_property_multiple::{
77    PropertyWriteSpec, WriteAccessSpecification, WritePropertyMultipleRequest,
78    SERVICE_WRITE_PROPERTY_MULTIPLE,
79};
80use rustbac_core::types::{DataValue, Date, ErrorClass, ErrorCode, ObjectId, PropertyId, Time};
81use rustbac_core::EncodeError;
82use rustbac_datalink::bip::transport::{
83    BacnetIpTransport, BroadcastDistributionEntry, ForeignDeviceTableEntry,
84};
85use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
86use std::collections::HashSet;
87use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
88use std::time::Duration;
89use tokio::sync::Mutex;
90use tokio::task::JoinHandle;
91use tokio::time::{timeout, Instant};
92
93const MIN_SEGMENT_DATA_LEN: usize = 32;
94const MAX_COMPLEX_ACK_REASSEMBLY_BYTES: usize = 1024 * 1024;
95
96#[derive(Debug)]
97pub struct BacnetClient<D: DataLink> {
98    datalink: D,
99    invoke_id: Mutex<u8>,
100    request_io_lock: Mutex<()>,
101    response_timeout: Duration,
102    segmented_request_window_size: u8,
103    segmented_request_retries: u8,
104    segment_ack_timeout: Duration,
105}
106
107#[derive(Debug)]
108pub struct ForeignDeviceRenewal {
109    task: JoinHandle<()>,
110}
111
112impl ForeignDeviceRenewal {
113    pub fn stop(self) {
114        self.task.abort();
115    }
116}
117
118impl Drop for ForeignDeviceRenewal {
119    fn drop(&mut self) {
120        self.task.abort();
121    }
122}
123
124impl BacnetClient<BacnetIpTransport> {
125    pub async fn new() -> Result<Self, ClientError> {
126        let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
127        let datalink = BacnetIpTransport::bind(bind_addr).await?;
128        Ok(Self {
129            datalink,
130            invoke_id: Mutex::new(1),
131            request_io_lock: Mutex::new(()),
132            response_timeout: Duration::from_secs(3),
133            segmented_request_window_size: 1,
134            segmented_request_retries: 2,
135            segment_ack_timeout: Duration::from_millis(500),
136        })
137    }
138
139    pub async fn new_foreign(bbmd_addr: SocketAddr, ttl_seconds: u16) -> Result<Self, ClientError> {
140        let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
141        let datalink = BacnetIpTransport::bind_foreign(bind_addr, bbmd_addr).await?;
142        datalink.register_foreign_device(ttl_seconds).await?;
143        Ok(Self {
144            datalink,
145            invoke_id: Mutex::new(1),
146            request_io_lock: Mutex::new(()),
147            response_timeout: Duration::from_secs(3),
148            segmented_request_window_size: 1,
149            segmented_request_retries: 2,
150            segment_ack_timeout: Duration::from_millis(500),
151        })
152    }
153
154    pub async fn register_foreign_device(&self, ttl_seconds: u16) -> Result<(), ClientError> {
155        self.datalink.register_foreign_device(ttl_seconds).await?;
156        Ok(())
157    }
158
159    pub async fn read_broadcast_distribution_table(
160        &self,
161    ) -> Result<Vec<BroadcastDistributionEntry>, ClientError> {
162        self.datalink
163            .read_broadcast_distribution_table()
164            .await
165            .map_err(ClientError::from)
166    }
167
168    pub async fn write_broadcast_distribution_table(
169        &self,
170        entries: &[BroadcastDistributionEntry],
171    ) -> Result<(), ClientError> {
172        self.datalink
173            .write_broadcast_distribution_table(entries)
174            .await?;
175        Ok(())
176    }
177
178    pub async fn read_foreign_device_table(
179        &self,
180    ) -> Result<Vec<ForeignDeviceTableEntry>, ClientError> {
181        self.datalink
182            .read_foreign_device_table()
183            .await
184            .map_err(ClientError::from)
185    }
186
187    pub async fn delete_foreign_device_table_entry(
188        &self,
189        address: SocketAddrV4,
190    ) -> Result<(), ClientError> {
191        self.datalink
192            .delete_foreign_device_table_entry(address)
193            .await?;
194        Ok(())
195    }
196
197    pub fn start_foreign_device_renewal(
198        &self,
199        ttl_seconds: u16,
200    ) -> Result<ForeignDeviceRenewal, ClientError> {
201        if ttl_seconds == 0 {
202            return Err(EncodeError::InvalidLength.into());
203        }
204
205        let datalink = self.datalink.clone();
206        let refresh_seconds = u64::from(ttl_seconds).saturating_mul(3) / 4;
207        let interval = Duration::from_secs(refresh_seconds.max(1));
208        let task = tokio::spawn(async move {
209            loop {
210                tokio::time::sleep(interval).await;
211                if let Err(err) = datalink.register_foreign_device_no_wait(ttl_seconds).await {
212                    log::warn!("foreign device renewal send failed: {err}");
213                }
214            }
215        });
216        Ok(ForeignDeviceRenewal { task })
217    }
218}
219
220impl BacnetClient<BacnetScTransport> {
221    pub async fn new_sc(endpoint: impl Into<String>) -> Result<Self, ClientError> {
222        let datalink = BacnetScTransport::connect(endpoint).await?;
223        Ok(Self::with_datalink(datalink))
224    }
225}
226
227impl<D: DataLink> BacnetClient<D> {
228    pub fn with_datalink(datalink: D) -> Self {
229        Self {
230            datalink,
231            invoke_id: Mutex::new(1),
232            request_io_lock: Mutex::new(()),
233            response_timeout: Duration::from_secs(3),
234            segmented_request_window_size: 1,
235            segmented_request_retries: 2,
236            segment_ack_timeout: Duration::from_millis(500),
237        }
238    }
239
240    pub fn with_response_timeout(mut self, timeout: Duration) -> Self {
241        self.response_timeout = timeout;
242        self
243    }
244
245    pub fn with_segmented_request_window_size(mut self, window_size: u8) -> Self {
246        self.segmented_request_window_size = window_size.max(1);
247        self
248    }
249
250    pub fn with_segmented_request_retries(mut self, retries: u8) -> Self {
251        self.segmented_request_retries = retries;
252        self
253    }
254
255    pub fn with_segment_ack_timeout(mut self, timeout: Duration) -> Self {
256        self.segment_ack_timeout = timeout.max(Duration::from_millis(1));
257        self
258    }
259
260    async fn next_invoke_id(&self) -> u8 {
261        let mut lock = self.invoke_id.lock().await;
262        let id = *lock;
263        *lock = lock.wrapping_add(1);
264        if *lock == 0 {
265            *lock = 1;
266        }
267        id
268    }
269
270    async fn send_segment_ack(
271        &self,
272        address: DataLinkAddress,
273        invoke_id: u8,
274        sequence_number: u8,
275        window_size: u8,
276    ) -> Result<(), ClientError> {
277        let mut tx = [0u8; 64];
278        let mut w = Writer::new(&mut tx);
279        Npdu::new(0).encode(&mut w)?;
280        SegmentAck {
281            negative_ack: false,
282            sent_by_server: false,
283            invoke_id,
284            sequence_number,
285            actual_window_size: window_size,
286        }
287        .encode(&mut w)?;
288        self.datalink.send(address, w.as_written()).await?;
289        Ok(())
290    }
291
292    async fn recv_ignoring_invalid_frame(
293        &self,
294        buf: &mut [u8],
295        deadline: Instant,
296    ) -> Result<(usize, DataLinkAddress), ClientError> {
297        loop {
298            let remaining = deadline.saturating_duration_since(Instant::now());
299            if remaining.is_zero() {
300                return Err(ClientError::Timeout);
301            }
302
303            match timeout(remaining, self.datalink.recv(buf)).await {
304                Err(_) => return Err(ClientError::Timeout),
305                Ok(Err(DataLinkError::InvalidFrame)) => continue,
306                Ok(Err(e)) => return Err(e.into()),
307                Ok(Ok(v)) => return Ok(v),
308            }
309        }
310    }
311
312    async fn send_simple_ack(
313        &self,
314        address: DataLinkAddress,
315        invoke_id: u8,
316        service_choice: u8,
317    ) -> Result<(), ClientError> {
318        let mut tx = [0u8; 64];
319        let mut w = Writer::new(&mut tx);
320        Npdu::new(0).encode(&mut w)?;
321        SimpleAck {
322            invoke_id,
323            service_choice,
324        }
325        .encode(&mut w)?;
326        self.datalink.send(address, w.as_written()).await?;
327        Ok(())
328    }
329
330    fn encode_with_growth<F>(&self, mut encode: F) -> Result<Vec<u8>, ClientError>
331    where
332        F: FnMut(&mut Writer<'_>) -> Result<(), EncodeError>,
333    {
334        for size in [512usize, 1024, 2048, 4096, 8192, 16_384, 32_768, 65_536] {
335            let mut buf = vec![0u8; size];
336            let mut w = Writer::new(&mut buf);
337            match encode(&mut w) {
338                Ok(()) => {
339                    let written_len = w.as_written().len();
340                    buf.truncate(written_len);
341                    return Ok(buf);
342                }
343                Err(EncodeError::BufferTooSmall) => continue,
344                Err(e) => return Err(e.into()),
345            }
346        }
347        Err(ClientError::SegmentedRequestTooLarge)
348    }
349
350    const fn max_apdu_octets(max_apdu_code: u8) -> usize {
351        match max_apdu_code & 0x0f {
352            0 => 50,
353            1 => 128,
354            2 => 206,
355            3 => 480,
356            4 => 1024,
357            5 => 1476,
358            _ => 480,
359        }
360    }
361
362    async fn await_segment_ack(
363        &self,
364        address: DataLinkAddress,
365        invoke_id: u8,
366        service_choice: u8,
367        expected_sequence: u8,
368        deadline: Instant,
369    ) -> Result<SegmentAck, ClientError> {
370        loop {
371            let remaining = deadline.saturating_duration_since(Instant::now());
372            if remaining.is_zero() {
373                return Err(ClientError::Timeout);
374            }
375
376            let mut rx = [0u8; 1500];
377            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
378            let (n, src) = match recv {
379                Err(_) => return Err(ClientError::Timeout),
380                Ok(Err(DataLinkError::InvalidFrame)) => continue,
381                Ok(Err(e)) => return Err(e.into()),
382                Ok(Ok(v)) => v,
383            };
384            if src != address {
385                continue;
386            }
387
388            let Ok(apdu) = extract_apdu(&rx[..n]) else {
389                continue;
390            };
391            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
392            match ApduType::from_u8(first >> 4) {
393                Some(ApduType::SegmentAck) => {
394                    let mut r = Reader::new(apdu);
395                    let ack = SegmentAck::decode(&mut r)?;
396                    if ack.invoke_id != invoke_id || !ack.sent_by_server {
397                        continue;
398                    }
399                    if ack.negative_ack {
400                        return Err(ClientError::SegmentNegativeAck {
401                            sequence_number: ack.sequence_number,
402                        });
403                    }
404                    if ack.sequence_number == expected_sequence {
405                        return Ok(ack);
406                    }
407                }
408                Some(ApduType::Error) => {
409                    let mut r = Reader::new(apdu);
410                    let err = BacnetError::decode(&mut r)?;
411                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
412                        return Err(remote_service_error(err));
413                    }
414                }
415                Some(ApduType::Reject) => {
416                    let mut r = Reader::new(apdu);
417                    let rej = RejectPdu::decode(&mut r)?;
418                    if rej.invoke_id == invoke_id {
419                        return Err(ClientError::RemoteReject { reason: rej.reason });
420                    }
421                }
422                Some(ApduType::Abort) => {
423                    let mut r = Reader::new(apdu);
424                    let abort = AbortPdu::decode(&mut r)?;
425                    if abort.invoke_id == invoke_id {
426                        return Err(ClientError::RemoteAbort {
427                            reason: abort.reason,
428                            server: abort.server,
429                        });
430                    }
431                }
432                _ => continue,
433            }
434        }
435    }
436
437    async fn send_confirmed_request(
438        &self,
439        address: DataLinkAddress,
440        frame: &[u8],
441        deadline: Instant,
442    ) -> Result<(), ClientError> {
443        let mut pr = Reader::new(frame);
444        let _npdu = Npdu::decode(&mut pr)?;
445        let npdu_len = frame.len() - pr.remaining();
446        let npdu_bytes = &frame[..npdu_len];
447        let apdu = &frame[npdu_len..];
448
449        let mut ar = Reader::new(apdu);
450        let header = ConfirmedRequestHeader::decode(&mut ar)?;
451        let service_payload = ar.read_exact(ar.remaining())?;
452
453        let segment_data_len = Self::max_apdu_octets(header.max_apdu)
454            .saturating_sub(5)
455            .max(MIN_SEGMENT_DATA_LEN);
456        let segment_count = service_payload.len().div_ceil(segment_data_len);
457
458        if segment_count <= 1 {
459            self.datalink.send(address, frame).await?;
460            return Ok(());
461        }
462
463        if segment_count > usize::from(u8::MAX) + 1 {
464            return Err(ClientError::SegmentedRequestTooLarge);
465        }
466
467        let configured_window_size = self.segmented_request_window_size.max(1);
468        let mut window_size = configured_window_size;
469        let mut peer_window_ceiling = configured_window_size;
470        let mut batch_start = 0usize;
471        while batch_start < segment_count {
472            let batch_end = (batch_start + usize::from(window_size)).min(segment_count);
473            let expected_sequence = (batch_end - 1) as u8;
474
475            let mut frames = Vec::with_capacity(batch_end - batch_start);
476            for segment_index in batch_start..batch_end {
477                let seq = segment_index as u8;
478                let more_follows = segment_index + 1 < segment_count;
479                let start = segment_index * segment_data_len;
480                let end = ((segment_index + 1) * segment_data_len).min(service_payload.len());
481                let segment = &service_payload[start..end];
482
483                let seg_header = ConfirmedRequestHeader {
484                    segmented: true,
485                    more_follows,
486                    segmented_response_accepted: header.segmented_response_accepted,
487                    max_segments: header.max_segments,
488                    max_apdu: header.max_apdu,
489                    invoke_id: header.invoke_id,
490                    sequence_number: Some(seq),
491                    proposed_window_size: Some(window_size),
492                    service_choice: header.service_choice,
493                };
494
495                let mut tx = vec![0u8; npdu_bytes.len() + 16 + segment.len()];
496                let written_len = {
497                    let mut w = Writer::new(&mut tx);
498                    w.write_all(npdu_bytes)?;
499                    seg_header.encode(&mut w)?;
500                    w.write_all(segment)?;
501                    w.as_written().len()
502                };
503                tx.truncate(written_len);
504                frames.push(tx);
505            }
506
507            let mut retries_remaining = self.segmented_request_retries;
508            loop {
509                for frame in &frames {
510                    self.datalink.send(address, frame).await?;
511                }
512
513                if batch_end == segment_count {
514                    break;
515                }
516
517                let remaining = deadline.saturating_duration_since(Instant::now());
518                if remaining.is_zero() {
519                    return Err(ClientError::Timeout);
520                }
521                let ack_wait_deadline = Instant::now() + remaining.min(self.segment_ack_timeout);
522                match self
523                    .await_segment_ack(
524                        address,
525                        header.invoke_id,
526                        header.service_choice,
527                        expected_sequence,
528                        ack_wait_deadline,
529                    )
530                    .await
531                {
532                    Ok(ack) => {
533                        peer_window_ceiling =
534                            peer_window_ceiling.min(ack.actual_window_size.max(1));
535                        window_size = window_size
536                            .saturating_add(1)
537                            .min(configured_window_size)
538                            .min(peer_window_ceiling)
539                            .max(1);
540                        break;
541                    }
542                    Err(ClientError::Timeout | ClientError::SegmentNegativeAck { .. })
543                        if retries_remaining > 0 =>
544                    {
545                        retries_remaining -= 1;
546                        window_size = window_size.saturating_div(2).max(1);
547                        continue;
548                    }
549                    Err(e) => return Err(e),
550                }
551            }
552
553            batch_start = batch_end;
554        }
555
556        Ok(())
557    }
558
559    async fn collect_complex_ack_payload(
560        &self,
561        address: DataLinkAddress,
562        invoke_id: u8,
563        service_choice: u8,
564        first_header: ComplexAckHeader,
565        first_payload: &[u8],
566        deadline: Instant,
567    ) -> Result<Vec<u8>, ClientError> {
568        let mut payload = first_payload.to_vec();
569        if payload.len() > MAX_COMPLEX_ACK_REASSEMBLY_BYTES {
570            return Err(ClientError::ResponseTooLarge {
571                limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
572            });
573        }
574        if !first_header.segmented {
575            return Ok(payload);
576        }
577
578        let mut last_seq = first_header
579            .sequence_number
580            .ok_or(ClientError::UnsupportedResponse)?;
581        let mut window_size = first_header.proposed_window_size.unwrap_or(1);
582        self.send_segment_ack(address, invoke_id, last_seq, window_size)
583            .await?;
584        let mut more_follows = first_header.more_follows;
585
586        while more_follows {
587            let mut rx = [0u8; 1500];
588            let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
589            if src != address {
590                continue;
591            }
592
593            let Ok(apdu) = extract_apdu(&rx[..n]) else {
594                continue;
595            };
596            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
597            match ApduType::from_u8(first >> 4) {
598                Some(ApduType::ComplexAck) => {
599                    let mut r = Reader::new(apdu);
600                    let seg = ComplexAckHeader::decode(&mut r)?;
601                    if seg.invoke_id != invoke_id || seg.service_choice != service_choice {
602                        continue;
603                    }
604                    if !seg.segmented {
605                        return Err(ClientError::UnsupportedResponse);
606                    }
607                    let seq = seg
608                        .sequence_number
609                        .ok_or(ClientError::UnsupportedResponse)?;
610                    if seq == last_seq {
611                        // Duplicate segment: acknowledge again and continue waiting.
612                        self.send_segment_ack(address, invoke_id, last_seq, window_size)
613                            .await?;
614                        continue;
615                    }
616                    if seq != last_seq.wrapping_add(1) {
617                        continue;
618                    }
619
620                    let seg_payload = r.read_exact(r.remaining())?;
621                    if payload.len().saturating_add(seg_payload.len())
622                        > MAX_COMPLEX_ACK_REASSEMBLY_BYTES
623                    {
624                        return Err(ClientError::ResponseTooLarge {
625                            limit: MAX_COMPLEX_ACK_REASSEMBLY_BYTES,
626                        });
627                    }
628                    payload.extend_from_slice(seg_payload);
629
630                    last_seq = seq;
631                    more_follows = seg.more_follows;
632                    window_size = seg.proposed_window_size.unwrap_or(window_size);
633                    self.send_segment_ack(address, invoke_id, last_seq, window_size)
634                        .await?;
635                }
636                Some(ApduType::Error) => {
637                    let mut r = Reader::new(apdu);
638                    let err = BacnetError::decode(&mut r)?;
639                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
640                        return Err(remote_service_error(err));
641                    }
642                }
643                Some(ApduType::Reject) => {
644                    let mut r = Reader::new(apdu);
645                    let rej = RejectPdu::decode(&mut r)?;
646                    if rej.invoke_id == invoke_id {
647                        return Err(ClientError::RemoteReject { reason: rej.reason });
648                    }
649                }
650                Some(ApduType::Abort) => {
651                    let mut r = Reader::new(apdu);
652                    let abort = AbortPdu::decode(&mut r)?;
653                    if abort.invoke_id == invoke_id {
654                        return Err(ClientError::RemoteAbort {
655                            reason: abort.reason,
656                            server: abort.server,
657                        });
658                    }
659                }
660                _ => continue,
661            }
662        }
663
664        Ok(payload)
665    }
666
667    pub async fn who_is(
668        &self,
669        range: Option<(u32, u32)>,
670        wait: Duration,
671    ) -> Result<Vec<DiscoveredDevice>, ClientError> {
672        let _io_lock = self.request_io_lock.lock().await;
673        let req = match range {
674            Some((low, high)) => WhoIsRequest {
675                low_limit: Some(low),
676                high_limit: Some(high),
677            },
678            None => WhoIsRequest::global(),
679        };
680
681        let mut tx = [0u8; 128];
682        let mut w = Writer::new(&mut tx);
683        Npdu::new(0).encode(&mut w)?;
684        req.encode(&mut w)?;
685
686        self.datalink
687            .send(
688                DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
689                w.as_written(),
690            )
691            .await?;
692
693        let mut devices = Vec::new();
694        let mut seen = HashSet::new();
695        let deadline = tokio::time::Instant::now() + wait;
696
697        while tokio::time::Instant::now() < deadline {
698            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
699            let mut rx = [0u8; 1500];
700            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
701            match recv {
702                Ok(Ok((n, src))) => {
703                    let Ok(apdu) = extract_apdu(&rx[..n]) else {
704                        continue;
705                    };
706                    let mut r = Reader::new(apdu);
707                    let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
708                        continue;
709                    };
710                    if unconfirmed.service_choice != SERVICE_I_AM {
711                        continue;
712                    }
713                    let Ok(i_am) = IAmRequest::decode_after_header(&mut r) else {
714                        continue;
715                    };
716                    if seen.insert(src) {
717                        devices.push(DiscoveredDevice {
718                            address: src,
719                            device_id: Some(i_am.device_id),
720                        });
721                    }
722                }
723                Ok(Err(DataLinkError::InvalidFrame)) => continue,
724                Ok(Err(e)) => return Err(e.into()),
725                Err(_) => break,
726            }
727        }
728
729        Ok(devices)
730    }
731
732    pub async fn who_has_object_id(
733        &self,
734        range: Option<(u32, u32)>,
735        object_id: ObjectId,
736        wait: Duration,
737    ) -> Result<Vec<DiscoveredObject>, ClientError> {
738        let req = WhoHasRequest {
739            low_limit: range.map(|(low, _)| low),
740            high_limit: range.map(|(_, high)| high),
741            object: WhoHasObject::ObjectId(object_id),
742        };
743        self.who_has(req, wait).await
744    }
745
746    pub async fn who_has_object_name(
747        &self,
748        range: Option<(u32, u32)>,
749        object_name: &str,
750        wait: Duration,
751    ) -> Result<Vec<DiscoveredObject>, ClientError> {
752        let req = WhoHasRequest {
753            low_limit: range.map(|(low, _)| low),
754            high_limit: range.map(|(_, high)| high),
755            object: WhoHasObject::ObjectName(object_name),
756        };
757        self.who_has(req, wait).await
758    }
759
760    async fn who_has(
761        &self,
762        request: WhoHasRequest<'_>,
763        wait: Duration,
764    ) -> Result<Vec<DiscoveredObject>, ClientError> {
765        let _io_lock = self.request_io_lock.lock().await;
766        let tx = self.encode_with_growth(|w| {
767            Npdu::new(0).encode(w)?;
768            request.encode(w)
769        })?;
770        self.datalink
771            .send(
772                DataLinkAddress::local_broadcast(DataLinkAddress::BACNET_IP_DEFAULT_PORT),
773                &tx,
774            )
775            .await?;
776
777        let mut objects = Vec::new();
778        let mut seen = HashSet::new();
779        let deadline = tokio::time::Instant::now() + wait;
780
781        while tokio::time::Instant::now() < deadline {
782            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
783            let mut rx = [0u8; 1500];
784            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
785            match recv {
786                Ok(Ok((n, src))) => {
787                    let Ok(apdu) = extract_apdu(&rx[..n]) else {
788                        continue;
789                    };
790                    let mut r = Reader::new(apdu);
791                    let Ok(unconfirmed) = UnconfirmedRequestHeader::decode(&mut r) else {
792                        continue;
793                    };
794                    if unconfirmed.service_choice != SERVICE_I_HAVE {
795                        continue;
796                    }
797                    let Ok(i_have) = IHaveRequest::decode_after_header(&mut r) else {
798                        continue;
799                    };
800                    if !seen.insert((src, i_have.object_id.raw())) {
801                        continue;
802                    }
803                    objects.push(DiscoveredObject {
804                        address: src,
805                        device_id: i_have.device_id,
806                        object_id: i_have.object_id,
807                        object_name: i_have.object_name.to_string(),
808                    });
809                }
810                Ok(Err(DataLinkError::InvalidFrame)) => continue,
811                Ok(Err(e)) => return Err(e.into()),
812                Err(_) => break,
813            }
814        }
815
816        Ok(objects)
817    }
818
819    pub async fn device_communication_control(
820        &self,
821        address: DataLinkAddress,
822        time_duration_seconds: Option<u16>,
823        enable_disable: DeviceCommunicationState,
824        password: Option<&str>,
825    ) -> Result<(), ClientError> {
826        let invoke_id = self.next_invoke_id().await;
827        let request = DeviceCommunicationControlRequest {
828            time_duration_seconds,
829            enable_disable,
830            password,
831            invoke_id,
832        };
833        let tx = self.encode_with_growth(|w| {
834            Npdu::new(0).encode(w)?;
835            request.encode(w)
836        })?;
837        self.await_simple_ack_or_error(
838            address,
839            &tx,
840            invoke_id,
841            SERVICE_DEVICE_COMMUNICATION_CONTROL,
842            self.response_timeout,
843        )
844        .await
845    }
846
847    pub async fn reinitialize_device(
848        &self,
849        address: DataLinkAddress,
850        state: ReinitializeState,
851        password: Option<&str>,
852    ) -> Result<(), ClientError> {
853        let invoke_id = self.next_invoke_id().await;
854        let request = ReinitializeDeviceRequest {
855            state,
856            password,
857            invoke_id,
858        };
859        let tx = self.encode_with_growth(|w| {
860            Npdu::new(0).encode(w)?;
861            request.encode(w)
862        })?;
863        self.await_simple_ack_or_error(
864            address,
865            &tx,
866            invoke_id,
867            SERVICE_REINITIALIZE_DEVICE,
868            self.response_timeout,
869        )
870        .await
871    }
872
873    pub async fn time_synchronize(
874        &self,
875        address: DataLinkAddress,
876        date: Date,
877        time: Time,
878        utc: bool,
879    ) -> Result<(), ClientError> {
880        let request = if utc {
881            TimeSynchronizationRequest::utc(date, time)
882        } else {
883            TimeSynchronizationRequest::local(date, time)
884        };
885        let tx = self.encode_with_growth(|w| {
886            Npdu::new(0).encode(w)?;
887            request.encode(w)
888        })?;
889        self.datalink.send(address, &tx).await?;
890        Ok(())
891    }
892
893    pub async fn create_object_by_type(
894        &self,
895        address: DataLinkAddress,
896        object_type: rustbac_core::types::ObjectType,
897    ) -> Result<ObjectId, ClientError> {
898        self.create_object(address, CreateObjectRequest::by_type(object_type, 0))
899            .await
900    }
901
902    pub async fn create_object(
903        &self,
904        address: DataLinkAddress,
905        mut request: CreateObjectRequest,
906    ) -> Result<ObjectId, ClientError> {
907        request.invoke_id = self.next_invoke_id().await;
908        let invoke_id = request.invoke_id;
909        let tx = self.encode_with_growth(|w| {
910            Npdu::new(0).encode(w)?;
911            request.encode(w)
912        })?;
913        let payload = self
914            .await_complex_ack_payload_or_error(
915                address,
916                &tx,
917                invoke_id,
918                SERVICE_CREATE_OBJECT,
919                self.response_timeout,
920            )
921            .await?;
922        let mut pr = Reader::new(&payload);
923        let parsed = CreateObjectAck::decode_after_header(&mut pr)?;
924        Ok(parsed.object_id)
925    }
926
927    pub async fn delete_object(
928        &self,
929        address: DataLinkAddress,
930        object_id: ObjectId,
931    ) -> Result<(), ClientError> {
932        let invoke_id = self.next_invoke_id().await;
933        let request = DeleteObjectRequest {
934            object_id,
935            invoke_id,
936        };
937        let tx = self.encode_with_growth(|w| {
938            Npdu::new(0).encode(w)?;
939            request.encode(w)
940        })?;
941        self.await_simple_ack_or_error(
942            address,
943            &tx,
944            invoke_id,
945            SERVICE_DELETE_OBJECT,
946            self.response_timeout,
947        )
948        .await
949    }
950
951    pub async fn add_list_element(
952        &self,
953        address: DataLinkAddress,
954        mut request: AddListElementRequest<'_>,
955    ) -> Result<(), ClientError> {
956        request.invoke_id = self.next_invoke_id().await;
957        let invoke_id = request.invoke_id;
958        let tx = self.encode_with_growth(|w| {
959            Npdu::new(0).encode(w)?;
960            request.encode(w)
961        })?;
962        self.await_simple_ack_or_error(
963            address,
964            &tx,
965            invoke_id,
966            SERVICE_ADD_LIST_ELEMENT,
967            self.response_timeout,
968        )
969        .await
970    }
971
972    pub async fn remove_list_element(
973        &self,
974        address: DataLinkAddress,
975        mut request: RemoveListElementRequest<'_>,
976    ) -> Result<(), ClientError> {
977        request.invoke_id = self.next_invoke_id().await;
978        let invoke_id = request.invoke_id;
979        let tx = self.encode_with_growth(|w| {
980            Npdu::new(0).encode(w)?;
981            request.encode(w)
982        })?;
983        self.await_simple_ack_or_error(
984            address,
985            &tx,
986            invoke_id,
987            SERVICE_REMOVE_LIST_ELEMENT,
988            self.response_timeout,
989        )
990        .await
991    }
992
993    async fn await_simple_ack_or_error(
994        &self,
995        address: DataLinkAddress,
996        tx: &[u8],
997        invoke_id: u8,
998        service_choice: u8,
999        timeout_window: Duration,
1000    ) -> Result<(), ClientError> {
1001        let _io_lock = self.request_io_lock.lock().await;
1002        let deadline = tokio::time::Instant::now() + timeout_window;
1003        self.send_confirmed_request(address, tx, deadline).await?;
1004        while tokio::time::Instant::now() < deadline {
1005            let mut rx = [0u8; 1500];
1006            let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1007            if src != address {
1008                continue;
1009            }
1010            let apdu = extract_apdu(&rx[..n])?;
1011            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1012            match ApduType::from_u8(first >> 4) {
1013                Some(ApduType::SimpleAck) => {
1014                    let mut r = Reader::new(apdu);
1015                    let ack = SimpleAck::decode(&mut r)?;
1016                    if ack.invoke_id == invoke_id && ack.service_choice == service_choice {
1017                        return Ok(());
1018                    }
1019                }
1020                Some(ApduType::Error) => {
1021                    let mut r = Reader::new(apdu);
1022                    let err = BacnetError::decode(&mut r)?;
1023                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
1024                        return Err(remote_service_error(err));
1025                    }
1026                }
1027                Some(ApduType::Reject) => {
1028                    let mut r = Reader::new(apdu);
1029                    let rej = RejectPdu::decode(&mut r)?;
1030                    if rej.invoke_id == invoke_id {
1031                        return Err(ClientError::RemoteReject { reason: rej.reason });
1032                    }
1033                }
1034                Some(ApduType::Abort) => {
1035                    let mut r = Reader::new(apdu);
1036                    let abort = AbortPdu::decode(&mut r)?;
1037                    if abort.invoke_id == invoke_id {
1038                        return Err(ClientError::RemoteAbort {
1039                            reason: abort.reason,
1040                            server: abort.server,
1041                        });
1042                    }
1043                }
1044                _ => continue,
1045            }
1046        }
1047        Err(ClientError::Timeout)
1048    }
1049
1050    async fn await_complex_ack_payload_or_error(
1051        &self,
1052        address: DataLinkAddress,
1053        tx: &[u8],
1054        invoke_id: u8,
1055        service_choice: u8,
1056        timeout_window: Duration,
1057    ) -> Result<Vec<u8>, ClientError> {
1058        let _io_lock = self.request_io_lock.lock().await;
1059        let deadline = tokio::time::Instant::now() + timeout_window;
1060        self.send_confirmed_request(address, tx, deadline).await?;
1061        while tokio::time::Instant::now() < deadline {
1062            let mut rx = [0u8; 1500];
1063            let (n, src) = self.recv_ignoring_invalid_frame(&mut rx, deadline).await?;
1064            if src != address {
1065                continue;
1066            }
1067
1068            let apdu = extract_apdu(&rx[..n])?;
1069            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1070            match ApduType::from_u8(first >> 4) {
1071                Some(ApduType::ComplexAck) => {
1072                    let mut r = Reader::new(apdu);
1073                    let ack = ComplexAckHeader::decode(&mut r)?;
1074                    if ack.invoke_id != invoke_id || ack.service_choice != service_choice {
1075                        continue;
1076                    }
1077                    return self
1078                        .collect_complex_ack_payload(
1079                            address,
1080                            invoke_id,
1081                            service_choice,
1082                            ack,
1083                            r.read_exact(r.remaining())?,
1084                            deadline,
1085                        )
1086                        .await;
1087                }
1088                Some(ApduType::Error) => {
1089                    let mut r = Reader::new(apdu);
1090                    let err = BacnetError::decode(&mut r)?;
1091                    if err.invoke_id == invoke_id && err.service_choice == service_choice {
1092                        return Err(remote_service_error(err));
1093                    }
1094                }
1095                Some(ApduType::Reject) => {
1096                    let mut r = Reader::new(apdu);
1097                    let rej = RejectPdu::decode(&mut r)?;
1098                    if rej.invoke_id == invoke_id {
1099                        return Err(ClientError::RemoteReject { reason: rej.reason });
1100                    }
1101                }
1102                Some(ApduType::Abort) => {
1103                    let mut r = Reader::new(apdu);
1104                    let abort = AbortPdu::decode(&mut r)?;
1105                    if abort.invoke_id == invoke_id {
1106                        return Err(ClientError::RemoteAbort {
1107                            reason: abort.reason,
1108                            server: abort.server,
1109                        });
1110                    }
1111                }
1112                _ => continue,
1113            }
1114        }
1115        Err(ClientError::Timeout)
1116    }
1117
1118    pub async fn get_alarm_summary(
1119        &self,
1120        address: DataLinkAddress,
1121    ) -> Result<Vec<AlarmSummaryItem>, ClientError> {
1122        let invoke_id = self.next_invoke_id().await;
1123        let request = GetAlarmSummaryRequest { invoke_id };
1124        let tx = self.encode_with_growth(|w| {
1125            Npdu::new(0).encode(w)?;
1126            request.encode(w)
1127        })?;
1128        let payload = self
1129            .await_complex_ack_payload_or_error(
1130                address,
1131                &tx,
1132                invoke_id,
1133                SERVICE_GET_ALARM_SUMMARY,
1134                self.response_timeout,
1135            )
1136            .await?;
1137        let mut pr = Reader::new(&payload);
1138        let parsed = GetAlarmSummaryAck::decode_after_header(&mut pr)?;
1139        Ok(into_client_alarm_summary(parsed.summaries))
1140    }
1141
1142    pub async fn get_enrollment_summary(
1143        &self,
1144        address: DataLinkAddress,
1145    ) -> Result<Vec<EnrollmentSummaryItem>, ClientError> {
1146        let invoke_id = self.next_invoke_id().await;
1147        let request = GetEnrollmentSummaryRequest { invoke_id };
1148        let tx = self.encode_with_growth(|w| {
1149            Npdu::new(0).encode(w)?;
1150            request.encode(w)
1151        })?;
1152        let payload = self
1153            .await_complex_ack_payload_or_error(
1154                address,
1155                &tx,
1156                invoke_id,
1157                SERVICE_GET_ENROLLMENT_SUMMARY,
1158                self.response_timeout,
1159            )
1160            .await?;
1161        let mut pr = Reader::new(&payload);
1162        let parsed = GetEnrollmentSummaryAck::decode_after_header(&mut pr)?;
1163        Ok(into_client_enrollment_summary(parsed.summaries))
1164    }
1165
1166    pub async fn get_event_information(
1167        &self,
1168        address: DataLinkAddress,
1169        last_received_object_id: Option<ObjectId>,
1170    ) -> Result<EventInformationResult, ClientError> {
1171        let invoke_id = self.next_invoke_id().await;
1172        let request = GetEventInformationRequest {
1173            last_received_object_id,
1174            invoke_id,
1175        };
1176        let tx = self.encode_with_growth(|w| {
1177            Npdu::new(0).encode(w)?;
1178            request.encode(w)
1179        })?;
1180        let payload = self
1181            .await_complex_ack_payload_or_error(
1182                address,
1183                &tx,
1184                invoke_id,
1185                SERVICE_GET_EVENT_INFORMATION,
1186                self.response_timeout,
1187            )
1188            .await?;
1189        let mut pr = Reader::new(&payload);
1190        let parsed = GetEventInformationAck::decode_after_header(&mut pr)?;
1191        Ok(EventInformationResult {
1192            summaries: into_client_event_information(parsed.summaries),
1193            more_events: parsed.more_events,
1194        })
1195    }
1196
1197    pub async fn acknowledge_alarm(
1198        &self,
1199        address: DataLinkAddress,
1200        mut request: AcknowledgeAlarmRequest<'_>,
1201    ) -> Result<(), ClientError> {
1202        request.invoke_id = self.next_invoke_id().await;
1203        let invoke_id = request.invoke_id;
1204        let tx = self.encode_with_growth(|w| {
1205            Npdu::new(0).encode(w)?;
1206            request.encode(w)
1207        })?;
1208        self.await_simple_ack_or_error(
1209            address,
1210            &tx,
1211            invoke_id,
1212            SERVICE_ACKNOWLEDGE_ALARM,
1213            self.response_timeout,
1214        )
1215        .await
1216    }
1217
1218    pub async fn atomic_read_file_stream(
1219        &self,
1220        address: DataLinkAddress,
1221        file_object_id: ObjectId,
1222        file_start_position: i32,
1223        requested_octet_count: u32,
1224    ) -> Result<AtomicReadFileResult, ClientError> {
1225        let invoke_id = self.next_invoke_id().await;
1226        let request = AtomicReadFileRequest::stream(
1227            file_object_id,
1228            file_start_position,
1229            requested_octet_count,
1230            invoke_id,
1231        );
1232        self.atomic_read_file(address, request).await
1233    }
1234
1235    pub async fn atomic_read_file_record(
1236        &self,
1237        address: DataLinkAddress,
1238        file_object_id: ObjectId,
1239        file_start_record: i32,
1240        requested_record_count: u32,
1241    ) -> Result<AtomicReadFileResult, ClientError> {
1242        let invoke_id = self.next_invoke_id().await;
1243        let request = AtomicReadFileRequest::record(
1244            file_object_id,
1245            file_start_record,
1246            requested_record_count,
1247            invoke_id,
1248        );
1249        self.atomic_read_file(address, request).await
1250    }
1251
1252    async fn atomic_read_file(
1253        &self,
1254        address: DataLinkAddress,
1255        request: AtomicReadFileRequest,
1256    ) -> Result<AtomicReadFileResult, ClientError> {
1257        let invoke_id = request.invoke_id;
1258        let tx = self.encode_with_growth(|w| {
1259            Npdu::new(0).encode(w)?;
1260            request.encode(w)
1261        })?;
1262        let payload = self
1263            .await_complex_ack_payload_or_error(
1264                address,
1265                &tx,
1266                invoke_id,
1267                SERVICE_ATOMIC_READ_FILE,
1268                self.response_timeout,
1269            )
1270            .await?;
1271        let mut pr = Reader::new(&payload);
1272        let parsed = AtomicReadFileAck::decode_after_header(&mut pr)?;
1273        Ok(into_client_atomic_read_result(parsed))
1274    }
1275
1276    pub async fn atomic_write_file_stream(
1277        &self,
1278        address: DataLinkAddress,
1279        file_object_id: ObjectId,
1280        file_start_position: i32,
1281        file_data: &[u8],
1282    ) -> Result<AtomicWriteFileResult, ClientError> {
1283        let invoke_id = self.next_invoke_id().await;
1284        let request = AtomicWriteFileRequest::stream(
1285            file_object_id,
1286            file_start_position,
1287            file_data,
1288            invoke_id,
1289        );
1290        self.atomic_write_file(address, request).await
1291    }
1292
1293    pub async fn atomic_write_file_record(
1294        &self,
1295        address: DataLinkAddress,
1296        file_object_id: ObjectId,
1297        file_start_record: i32,
1298        file_record_data: &[&[u8]],
1299    ) -> Result<AtomicWriteFileResult, ClientError> {
1300        let invoke_id = self.next_invoke_id().await;
1301        let request = AtomicWriteFileRequest::record(
1302            file_object_id,
1303            file_start_record,
1304            file_record_data,
1305            invoke_id,
1306        );
1307        self.atomic_write_file(address, request).await
1308    }
1309
1310    async fn atomic_write_file(
1311        &self,
1312        address: DataLinkAddress,
1313        request: AtomicWriteFileRequest<'_>,
1314    ) -> Result<AtomicWriteFileResult, ClientError> {
1315        let invoke_id = request.invoke_id;
1316        let tx = self.encode_with_growth(|w| {
1317            Npdu::new(0).encode(w)?;
1318            request.encode(w)
1319        })?;
1320        let payload = self
1321            .await_complex_ack_payload_or_error(
1322                address,
1323                &tx,
1324                invoke_id,
1325                SERVICE_ATOMIC_WRITE_FILE,
1326                self.response_timeout,
1327            )
1328            .await?;
1329        let mut pr = Reader::new(&payload);
1330        let parsed = AtomicWriteFileAck::decode_after_header(&mut pr)?;
1331        Ok(into_client_atomic_write_result(parsed))
1332    }
1333
1334    pub async fn subscribe_cov(
1335        &self,
1336        address: DataLinkAddress,
1337        mut request: SubscribeCovRequest,
1338    ) -> Result<(), ClientError> {
1339        request.invoke_id = self.next_invoke_id().await;
1340        let invoke_id = request.invoke_id;
1341        let tx = self.encode_with_growth(|w| {
1342            Npdu::new(0).encode(w)?;
1343            request.encode(w)
1344        })?;
1345        self.await_simple_ack_or_error(
1346            address,
1347            &tx,
1348            invoke_id,
1349            SERVICE_SUBSCRIBE_COV,
1350            self.response_timeout,
1351        )
1352        .await
1353    }
1354
1355    pub async fn cancel_cov_subscription(
1356        &self,
1357        address: DataLinkAddress,
1358        subscriber_process_id: u32,
1359        monitored_object_id: ObjectId,
1360    ) -> Result<(), ClientError> {
1361        self.subscribe_cov(
1362            address,
1363            SubscribeCovRequest::cancel(subscriber_process_id, monitored_object_id, 0),
1364        )
1365        .await
1366    }
1367
1368    pub async fn subscribe_cov_property(
1369        &self,
1370        address: DataLinkAddress,
1371        mut request: SubscribeCovPropertyRequest,
1372    ) -> Result<(), ClientError> {
1373        request.invoke_id = self.next_invoke_id().await;
1374        let invoke_id = request.invoke_id;
1375        let tx = self.encode_with_growth(|w| {
1376            Npdu::new(0).encode(w)?;
1377            request.encode(w)
1378        })?;
1379        self.await_simple_ack_or_error(
1380            address,
1381            &tx,
1382            invoke_id,
1383            SERVICE_SUBSCRIBE_COV_PROPERTY,
1384            self.response_timeout,
1385        )
1386        .await
1387    }
1388
1389    pub async fn cancel_cov_property_subscription(
1390        &self,
1391        address: DataLinkAddress,
1392        subscriber_process_id: u32,
1393        monitored_object_id: ObjectId,
1394        monitored_property_id: PropertyId,
1395        monitored_property_array_index: Option<u32>,
1396    ) -> Result<(), ClientError> {
1397        self.subscribe_cov_property(
1398            address,
1399            SubscribeCovPropertyRequest::cancel(
1400                subscriber_process_id,
1401                monitored_object_id,
1402                monitored_property_id,
1403                monitored_property_array_index,
1404                0,
1405            ),
1406        )
1407        .await
1408    }
1409
1410    pub async fn read_range_by_position(
1411        &self,
1412        address: DataLinkAddress,
1413        object_id: ObjectId,
1414        property_id: PropertyId,
1415        array_index: Option<u32>,
1416        reference_index: i32,
1417        count: i16,
1418    ) -> Result<ReadRangeResult, ClientError> {
1419        let invoke_id = self.next_invoke_id().await;
1420        let req = ReadRangeRequest::by_position(
1421            object_id,
1422            property_id,
1423            array_index,
1424            reference_index,
1425            count,
1426            invoke_id,
1427        );
1428        self.read_range_with_request(address, req).await
1429    }
1430
1431    pub async fn read_range_by_sequence_number(
1432        &self,
1433        address: DataLinkAddress,
1434        object_id: ObjectId,
1435        property_id: PropertyId,
1436        array_index: Option<u32>,
1437        reference_sequence: u32,
1438        count: i16,
1439    ) -> Result<ReadRangeResult, ClientError> {
1440        let invoke_id = self.next_invoke_id().await;
1441        let req = ReadRangeRequest::by_sequence_number(
1442            object_id,
1443            property_id,
1444            array_index,
1445            reference_sequence,
1446            count,
1447            invoke_id,
1448        );
1449        self.read_range_with_request(address, req).await
1450    }
1451
1452    pub async fn read_range_by_time(
1453        &self,
1454        address: DataLinkAddress,
1455        object_id: ObjectId,
1456        property_id: PropertyId,
1457        array_index: Option<u32>,
1458        at: (Date, Time),
1459        count: i16,
1460    ) -> Result<ReadRangeResult, ClientError> {
1461        let (date, time) = at;
1462        let invoke_id = self.next_invoke_id().await;
1463        let req = ReadRangeRequest::by_time(
1464            object_id,
1465            property_id,
1466            array_index,
1467            date,
1468            time,
1469            count,
1470            invoke_id,
1471        );
1472        self.read_range_with_request(address, req).await
1473    }
1474
1475    async fn read_range_with_request(
1476        &self,
1477        address: DataLinkAddress,
1478        req: ReadRangeRequest,
1479    ) -> Result<ReadRangeResult, ClientError> {
1480        let invoke_id = req.invoke_id;
1481        let tx = self.encode_with_growth(|w| {
1482            Npdu::new(0).encode(w)?;
1483            req.encode(w)
1484        })?;
1485        let payload = self
1486            .await_complex_ack_payload_or_error(
1487                address,
1488                &tx,
1489                invoke_id,
1490                SERVICE_READ_RANGE,
1491                self.response_timeout,
1492            )
1493            .await?;
1494        let mut pr = Reader::new(&payload);
1495        let parsed = ReadRangeAck::decode_after_header(&mut pr)?;
1496        into_client_read_range(parsed)
1497    }
1498
1499    pub async fn recv_cov_notification(
1500        &self,
1501        wait: Duration,
1502    ) -> Result<Option<CovNotification>, ClientError> {
1503        let _io_lock = self.request_io_lock.lock().await;
1504        let deadline = tokio::time::Instant::now() + wait;
1505
1506        while tokio::time::Instant::now() < deadline {
1507            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1508            let mut rx = [0u8; 1500];
1509            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1510            let (n, source) = match recv {
1511                Ok(Ok(v)) => v,
1512                Ok(Err(e)) => return Err(e.into()),
1513                Err(_) => break,
1514            };
1515
1516            let apdu = extract_apdu(&rx[..n])?;
1517            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1518            match ApduType::from_u8(first >> 4) {
1519                Some(ApduType::UnconfirmedRequest) => {
1520                    let mut r = Reader::new(apdu);
1521                    let header = UnconfirmedRequestHeader::decode(&mut r)?;
1522                    if header.service_choice != SERVICE_UNCONFIRMED_COV_NOTIFICATION {
1523                        continue;
1524                    }
1525                    let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1526                    return Ok(Some(into_client_cov_notification(source, false, cov)?));
1527                }
1528                Some(ApduType::ConfirmedRequest) => {
1529                    let mut r = Reader::new(apdu);
1530                    let header = ConfirmedRequestHeader::decode(&mut r)?;
1531                    if header.service_choice != SERVICE_CONFIRMED_COV_NOTIFICATION {
1532                        continue;
1533                    }
1534                    if header.segmented {
1535                        return Err(ClientError::UnsupportedResponse);
1536                    }
1537
1538                    let cov = CovNotificationRequest::decode_after_header(&mut r)?;
1539                    self.send_simple_ack(
1540                        source,
1541                        header.invoke_id,
1542                        SERVICE_CONFIRMED_COV_NOTIFICATION,
1543                    )
1544                    .await?;
1545                    return Ok(Some(into_client_cov_notification(source, true, cov)?));
1546                }
1547                _ => continue,
1548            }
1549        }
1550
1551        Ok(None)
1552    }
1553
1554    pub async fn recv_event_notification(
1555        &self,
1556        wait: Duration,
1557    ) -> Result<Option<EventNotification>, ClientError> {
1558        let _io_lock = self.request_io_lock.lock().await;
1559        let deadline = tokio::time::Instant::now() + wait;
1560
1561        while tokio::time::Instant::now() < deadline {
1562            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1563            let mut rx = [0u8; 1500];
1564            let recv = timeout(remaining, self.datalink.recv(&mut rx)).await;
1565            let (n, source) = match recv {
1566                Ok(Ok(v)) => v,
1567                Ok(Err(e)) => return Err(e.into()),
1568                Err(_) => break,
1569            };
1570
1571            let apdu = extract_apdu(&rx[..n])?;
1572            let first = *apdu.first().ok_or(ClientError::UnsupportedResponse)?;
1573            match ApduType::from_u8(first >> 4) {
1574                Some(ApduType::UnconfirmedRequest) => {
1575                    let mut r = Reader::new(apdu);
1576                    let header = UnconfirmedRequestHeader::decode(&mut r)?;
1577                    if header.service_choice != SERVICE_UNCONFIRMED_EVENT_NOTIFICATION {
1578                        continue;
1579                    }
1580                    let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1581                    return Ok(Some(into_client_event_notification(
1582                        source,
1583                        false,
1584                        notification,
1585                    )));
1586                }
1587                Some(ApduType::ConfirmedRequest) => {
1588                    let mut r = Reader::new(apdu);
1589                    let header = ConfirmedRequestHeader::decode(&mut r)?;
1590                    if header.service_choice != SERVICE_CONFIRMED_EVENT_NOTIFICATION {
1591                        continue;
1592                    }
1593                    if header.segmented {
1594                        return Err(ClientError::UnsupportedResponse);
1595                    }
1596                    let notification = EventNotificationRequest::decode_after_header(&mut r)?;
1597                    self.send_simple_ack(
1598                        source,
1599                        header.invoke_id,
1600                        SERVICE_CONFIRMED_EVENT_NOTIFICATION,
1601                    )
1602                    .await?;
1603                    return Ok(Some(into_client_event_notification(
1604                        source,
1605                        true,
1606                        notification,
1607                    )));
1608                }
1609                _ => continue,
1610            }
1611        }
1612
1613        Ok(None)
1614    }
1615
1616    pub async fn read_property(
1617        &self,
1618        address: DataLinkAddress,
1619        object_id: ObjectId,
1620        property_id: PropertyId,
1621    ) -> Result<ClientDataValue, ClientError> {
1622        let invoke_id = self.next_invoke_id().await;
1623        let req = ReadPropertyRequest {
1624            object_id,
1625            property_id,
1626            array_index: None,
1627            invoke_id,
1628        };
1629        let tx = self.encode_with_growth(|w| {
1630            Npdu::new(0).encode(w)?;
1631            req.encode(w)
1632        })?;
1633        let payload = self
1634            .await_complex_ack_payload_or_error(
1635                address,
1636                &tx,
1637                invoke_id,
1638                SERVICE_READ_PROPERTY,
1639                self.response_timeout,
1640            )
1641            .await?;
1642        let mut pr = Reader::new(&payload);
1643        let parsed = ReadPropertyAck::decode_after_header(&mut pr)?;
1644        into_client_value(parsed.value)
1645    }
1646
1647    pub async fn write_property(
1648        &self,
1649        address: DataLinkAddress,
1650        mut request: WritePropertyRequest<'_>,
1651    ) -> Result<(), ClientError> {
1652        request.invoke_id = self.next_invoke_id().await;
1653        let invoke_id = request.invoke_id;
1654        let tx = self.encode_with_growth(|w| {
1655            Npdu::new(0).encode(w)?;
1656            request.encode(w)
1657        })?;
1658        self.await_simple_ack_or_error(
1659            address,
1660            &tx,
1661            invoke_id,
1662            SERVICE_WRITE_PROPERTY,
1663            self.response_timeout,
1664        )
1665        .await
1666    }
1667
1668    pub async fn read_property_multiple(
1669        &self,
1670        address: DataLinkAddress,
1671        object_id: ObjectId,
1672        property_ids: &[PropertyId],
1673    ) -> Result<Vec<(PropertyId, ClientDataValue)>, ClientError> {
1674        let refs: Vec<PropertyReference> = property_ids
1675            .iter()
1676            .copied()
1677            .map(|property_id| PropertyReference {
1678                property_id,
1679                array_index: None,
1680            })
1681            .collect();
1682        let specs = [ReadAccessSpecification {
1683            object_id,
1684            properties: &refs,
1685        }];
1686
1687        let invoke_id = self.next_invoke_id().await;
1688        let req = ReadPropertyMultipleRequest {
1689            specs: &specs,
1690            invoke_id,
1691        };
1692
1693        let tx = self.encode_with_growth(|w| {
1694            Npdu::new(0).encode(w)?;
1695            req.encode(w)
1696        })?;
1697        let payload = self
1698            .await_complex_ack_payload_or_error(
1699                address,
1700                &tx,
1701                invoke_id,
1702                SERVICE_READ_PROPERTY_MULTIPLE,
1703                self.response_timeout,
1704            )
1705            .await?;
1706        let mut pr = Reader::new(&payload);
1707        let parsed = ReadPropertyMultipleAck::decode_after_header(&mut pr)?;
1708        let mut out = Vec::new();
1709        for access in parsed.results {
1710            if access.object_id != object_id {
1711                continue;
1712            }
1713            for item in access.results {
1714                out.push((item.property_id, into_client_value(item.value)?));
1715            }
1716        }
1717        Ok(out)
1718    }
1719
1720    pub async fn write_property_multiple(
1721        &self,
1722        address: DataLinkAddress,
1723        object_id: ObjectId,
1724        properties: &[PropertyWriteSpec<'_>],
1725    ) -> Result<(), ClientError> {
1726        let invoke_id = self.next_invoke_id().await;
1727        let specs = [WriteAccessSpecification {
1728            object_id,
1729            properties,
1730        }];
1731        let req = WritePropertyMultipleRequest {
1732            specs: &specs,
1733            invoke_id,
1734        };
1735
1736        let tx = self.encode_with_growth(|w| {
1737            Npdu::new(0).encode(w)?;
1738            req.encode(w)
1739        })?;
1740        self.await_simple_ack_or_error(
1741            address,
1742            &tx,
1743            invoke_id,
1744            SERVICE_WRITE_PROPERTY_MULTIPLE,
1745            self.response_timeout,
1746        )
1747        .await
1748    }
1749
1750    /// Send a ConfirmedPrivateTransfer request and return the ack.
1751    pub async fn private_transfer(
1752        &self,
1753        address: DataLinkAddress,
1754        vendor_id: u32,
1755        service_number: u32,
1756        service_parameters: Option<&[u8]>,
1757    ) -> Result<PrivateTransferAck, ClientError> {
1758        let invoke_id = self.next_invoke_id().await;
1759        let req = ConfirmedPrivateTransferRequest {
1760            vendor_id,
1761            service_number,
1762            service_parameters,
1763            invoke_id,
1764        };
1765
1766        let tx = self.encode_with_growth(|w| {
1767            Npdu::new(0).encode(w)?;
1768            req.encode(w)
1769        })?;
1770        let payload = self
1771            .await_complex_ack_payload_or_error(
1772                address,
1773                &tx,
1774                invoke_id,
1775                SERVICE_CONFIRMED_PRIVATE_TRANSFER,
1776                self.response_timeout,
1777            )
1778            .await?;
1779        let mut r = Reader::new(&payload);
1780        PrivateTransferAck::decode(&mut r).map_err(ClientError::from)
1781    }
1782}
1783
1784fn extract_apdu(payload: &[u8]) -> Result<&[u8], ClientError> {
1785    let mut r = Reader::new(payload);
1786    let _npdu = Npdu::decode(&mut r)?;
1787    r.read_exact(r.remaining()).map_err(ClientError::from)
1788}
1789
1790fn remote_service_error(err: BacnetError) -> ClientError {
1791    ClientError::RemoteServiceError {
1792        service_choice: err.service_choice,
1793        error_class_raw: err.error_class,
1794        error_code_raw: err.error_code,
1795        error_class: err.error_class.and_then(ErrorClass::from_u32),
1796        error_code: err.error_code.and_then(ErrorCode::from_u32),
1797    }
1798}
1799
1800fn into_client_value(value: DataValue<'_>) -> Result<ClientDataValue, ClientError> {
1801    Ok(match value {
1802        DataValue::Null => ClientDataValue::Null,
1803        DataValue::Boolean(v) => ClientDataValue::Boolean(v),
1804        DataValue::Unsigned(v) => ClientDataValue::Unsigned(v),
1805        DataValue::Signed(v) => ClientDataValue::Signed(v),
1806        DataValue::Real(v) => ClientDataValue::Real(v),
1807        DataValue::Double(v) => ClientDataValue::Double(v),
1808        DataValue::OctetString(v) => ClientDataValue::OctetString(v.to_vec()),
1809        DataValue::CharacterString(v) => ClientDataValue::CharacterString(v.to_string()),
1810        DataValue::BitString(v) => ClientDataValue::BitString {
1811            unused_bits: v.unused_bits,
1812            data: v.data.to_vec(),
1813        },
1814        DataValue::Enumerated(v) => ClientDataValue::Enumerated(v),
1815        DataValue::Date(v) => ClientDataValue::Date(v),
1816        DataValue::Time(v) => ClientDataValue::Time(v),
1817        DataValue::ObjectId(v) => ClientDataValue::ObjectId(v),
1818        DataValue::Constructed { tag_num, values } => {
1819            let mut children = Vec::with_capacity(values.len());
1820            for child in values {
1821                children.push(into_client_value(child)?);
1822            }
1823            ClientDataValue::Constructed {
1824                tag_num,
1825                values: children,
1826            }
1827        }
1828    })
1829}
1830
1831fn into_client_alarm_summary(value: Vec<CoreAlarmSummaryItem<'_>>) -> Vec<AlarmSummaryItem> {
1832    value
1833        .into_iter()
1834        .map(|item| AlarmSummaryItem {
1835            object_id: item.object_id,
1836            alarm_state_raw: item.alarm_state,
1837            alarm_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
1838                item.alarm_state,
1839            ),
1840            acknowledged_transitions: ClientBitString {
1841                unused_bits: item.acknowledged_transitions.unused_bits,
1842                data: item.acknowledged_transitions.data.to_vec(),
1843            },
1844        })
1845        .collect()
1846}
1847
1848fn into_client_enrollment_summary(
1849    value: Vec<CoreEnrollmentSummaryItem>,
1850) -> Vec<EnrollmentSummaryItem> {
1851    value
1852        .into_iter()
1853        .map(|item| EnrollmentSummaryItem {
1854            object_id: item.object_id,
1855            event_type: item.event_type,
1856            event_state_raw: item.event_state,
1857            event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
1858                item.event_state,
1859            ),
1860            priority: item.priority,
1861            notification_class: item.notification_class,
1862        })
1863        .collect()
1864}
1865
1866fn into_client_event_information(
1867    value: Vec<CoreEventSummaryItem<'_>>,
1868) -> Vec<EventInformationItem> {
1869    value
1870        .into_iter()
1871        .map(|item| EventInformationItem {
1872            object_id: item.object_id,
1873            event_state_raw: item.event_state,
1874            event_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
1875                item.event_state,
1876            ),
1877            acknowledged_transitions: ClientBitString {
1878                unused_bits: item.acknowledged_transitions.unused_bits,
1879                data: item.acknowledged_transitions.data.to_vec(),
1880            },
1881            notify_type: item.notify_type,
1882            event_enable: ClientBitString {
1883                unused_bits: item.event_enable.unused_bits,
1884                data: item.event_enable.data.to_vec(),
1885            },
1886            event_priorities: item.event_priorities,
1887        })
1888        .collect()
1889}
1890
1891fn into_client_cov_notification(
1892    source: DataLinkAddress,
1893    confirmed: bool,
1894    value: CovNotificationRequest<'_>,
1895) -> Result<CovNotification, ClientError> {
1896    let mut values = Vec::with_capacity(value.values.len());
1897    for property in value.values {
1898        values.push(CovPropertyValue {
1899            property_id: property.property_id,
1900            array_index: property.array_index,
1901            value: into_client_value(property.value)?,
1902            priority: property.priority,
1903        });
1904    }
1905
1906    Ok(CovNotification {
1907        source,
1908        confirmed,
1909        subscriber_process_id: value.subscriber_process_id,
1910        initiating_device_id: value.initiating_device_id,
1911        monitored_object_id: value.monitored_object_id,
1912        time_remaining_seconds: value.time_remaining_seconds,
1913        values,
1914    })
1915}
1916
1917fn into_client_event_notification(
1918    source: DataLinkAddress,
1919    confirmed: bool,
1920    value: EventNotificationRequest<'_>,
1921) -> EventNotification {
1922    EventNotification {
1923        source,
1924        confirmed,
1925        process_id: value.process_id,
1926        initiating_device_id: value.initiating_device_id,
1927        event_object_id: value.event_object_id,
1928        timestamp: value.timestamp,
1929        notification_class: value.notification_class,
1930        priority: value.priority,
1931        event_type: value.event_type,
1932        message_text: value.message_text.map(str::to_string),
1933        notify_type: value.notify_type,
1934        ack_required: value.ack_required,
1935        from_state_raw: value.from_state,
1936        from_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(
1937            value.from_state,
1938        ),
1939        to_state_raw: value.to_state,
1940        to_state: rustbac_core::services::acknowledge_alarm::EventState::from_u32(value.to_state),
1941    }
1942}
1943
1944fn into_client_read_range(value: ReadRangeAck<'_>) -> Result<ReadRangeResult, ClientError> {
1945    let mut items = Vec::with_capacity(value.items.len());
1946    for item in value.items {
1947        items.push(into_client_value(item)?);
1948    }
1949    Ok(ReadRangeResult {
1950        object_id: value.object_id,
1951        property_id: value.property_id,
1952        array_index: value.array_index,
1953        result_flags: ClientBitString {
1954            unused_bits: value.result_flags.unused_bits,
1955            data: value.result_flags.data.to_vec(),
1956        },
1957        item_count: value.item_count,
1958        items,
1959    })
1960}
1961
1962fn into_client_atomic_read_result(value: AtomicReadFileAck<'_>) -> AtomicReadFileResult {
1963    match value.access_method {
1964        AtomicReadFileAckAccess::Stream {
1965            file_start_position,
1966            file_data,
1967        } => AtomicReadFileResult::Stream {
1968            end_of_file: value.end_of_file,
1969            file_start_position,
1970            file_data: file_data.to_vec(),
1971        },
1972        AtomicReadFileAckAccess::Record {
1973            file_start_record,
1974            returned_record_count,
1975            file_record_data,
1976        } => AtomicReadFileResult::Record {
1977            end_of_file: value.end_of_file,
1978            file_start_record,
1979            returned_record_count,
1980            file_record_data: file_record_data
1981                .into_iter()
1982                .map(|record| record.to_vec())
1983                .collect(),
1984        },
1985    }
1986}
1987
1988fn into_client_atomic_write_result(value: AtomicWriteFileAck) -> AtomicWriteFileResult {
1989    match value {
1990        AtomicWriteFileAck::Stream {
1991            file_start_position,
1992        } => AtomicWriteFileResult::Stream {
1993            file_start_position,
1994        },
1995        AtomicWriteFileAck::Record { file_start_record } => {
1996            AtomicWriteFileResult::Record { file_start_record }
1997        }
1998    }
1999}
2000
2001#[cfg(test)]
2002mod tests {
2003    use super::BacnetClient;
2004    use crate::{
2005        AlarmSummaryItem, AtomicReadFileResult, AtomicWriteFileResult, ClientDataValue,
2006        EnrollmentSummaryItem, EventInformationItem, EventNotification,
2007    };
2008    use rustbac_core::apdu::{
2009        ApduType, ComplexAckHeader, ConfirmedRequestHeader, SegmentAck, SimpleAck,
2010        UnconfirmedRequestHeader,
2011    };
2012    use rustbac_core::encoding::{
2013        primitives::{
2014            decode_signed, decode_unsigned, encode_app_real, encode_ctx_character_string,
2015            encode_ctx_object_id, encode_ctx_unsigned,
2016        },
2017        reader::Reader,
2018        tag::{AppTag, Tag},
2019        writer::Writer,
2020    };
2021    use rustbac_core::npdu::Npdu;
2022    use rustbac_core::services::acknowledge_alarm::{
2023        AcknowledgeAlarmRequest, EventState, TimeStamp, SERVICE_ACKNOWLEDGE_ALARM,
2024    };
2025    use rustbac_core::services::alarm_summary::SERVICE_GET_ALARM_SUMMARY;
2026    use rustbac_core::services::atomic_read_file::SERVICE_ATOMIC_READ_FILE;
2027    use rustbac_core::services::atomic_write_file::SERVICE_ATOMIC_WRITE_FILE;
2028    use rustbac_core::services::cov_notification::{
2029        SERVICE_CONFIRMED_COV_NOTIFICATION, SERVICE_UNCONFIRMED_COV_NOTIFICATION,
2030    };
2031    use rustbac_core::services::device_management::{
2032        DeviceCommunicationState, ReinitializeState, SERVICE_DEVICE_COMMUNICATION_CONTROL,
2033        SERVICE_REINITIALIZE_DEVICE,
2034    };
2035    use rustbac_core::services::enrollment_summary::SERVICE_GET_ENROLLMENT_SUMMARY;
2036    use rustbac_core::services::event_information::SERVICE_GET_EVENT_INFORMATION;
2037    use rustbac_core::services::event_notification::{
2038        SERVICE_CONFIRMED_EVENT_NOTIFICATION, SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
2039    };
2040    use rustbac_core::services::list_element::{
2041        AddListElementRequest, RemoveListElementRequest, SERVICE_ADD_LIST_ELEMENT,
2042        SERVICE_REMOVE_LIST_ELEMENT,
2043    };
2044    use rustbac_core::services::object_management::{SERVICE_CREATE_OBJECT, SERVICE_DELETE_OBJECT};
2045    use rustbac_core::services::read_property::SERVICE_READ_PROPERTY;
2046    use rustbac_core::services::read_property_multiple::SERVICE_READ_PROPERTY_MULTIPLE;
2047    use rustbac_core::services::read_range::SERVICE_READ_RANGE;
2048    use rustbac_core::services::subscribe_cov::{SubscribeCovRequest, SERVICE_SUBSCRIBE_COV};
2049    use rustbac_core::services::subscribe_cov_property::{
2050        SubscribeCovPropertyRequest, SERVICE_SUBSCRIBE_COV_PROPERTY,
2051    };
2052    use rustbac_core::services::time_synchronization::SERVICE_TIME_SYNCHRONIZATION;
2053    use rustbac_core::services::who_has::{SERVICE_I_HAVE, SERVICE_WHO_HAS};
2054    use rustbac_core::services::write_property_multiple::{
2055        PropertyWriteSpec, SERVICE_WRITE_PROPERTY_MULTIPLE,
2056    };
2057    use rustbac_core::types::{DataValue, Date, ObjectId, ObjectType, PropertyId, Time};
2058    use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
2059    use std::collections::VecDeque;
2060    use std::sync::Arc;
2061    use std::time::Duration;
2062    use tokio::sync::Mutex;
2063
2064    #[derive(Debug, Default)]
2065    struct MockState {
2066        sent: Mutex<Vec<(DataLinkAddress, Vec<u8>)>>,
2067        recv: Mutex<VecDeque<(Vec<u8>, DataLinkAddress)>>,
2068    }
2069
2070    #[derive(Debug, Clone)]
2071    struct MockDataLink {
2072        state: Arc<MockState>,
2073    }
2074
2075    impl MockDataLink {
2076        fn new() -> (Self, Arc<MockState>) {
2077            let state = Arc::new(MockState::default());
2078            (
2079                Self {
2080                    state: state.clone(),
2081                },
2082                state,
2083            )
2084        }
2085    }
2086
2087    impl DataLink for MockDataLink {
2088        async fn send(
2089            &self,
2090            address: DataLinkAddress,
2091            payload: &[u8],
2092        ) -> Result<(), DataLinkError> {
2093            self.state
2094                .sent
2095                .lock()
2096                .await
2097                .push((address, payload.to_vec()));
2098            Ok(())
2099        }
2100
2101        async fn recv(&self, buf: &mut [u8]) -> Result<(usize, DataLinkAddress), DataLinkError> {
2102            let Some((payload, addr)) = self.state.recv.lock().await.pop_front() else {
2103                return Err(DataLinkError::InvalidFrame);
2104            };
2105            if payload.len() > buf.len() {
2106                return Err(DataLinkError::FrameTooLarge);
2107            }
2108            buf[..payload.len()].copy_from_slice(&payload);
2109            Ok((payload.len(), addr))
2110        }
2111    }
2112
2113    fn with_npdu(apdu: &[u8]) -> Vec<u8> {
2114        let mut out = [0u8; 512];
2115        let mut w = Writer::new(&mut out);
2116        Npdu::new(0).encode(&mut w).unwrap();
2117        w.write_all(apdu).unwrap();
2118        w.as_written().to_vec()
2119    }
2120
2121    fn read_range_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
2122        let mut apdu_buf = [0u8; 256];
2123        let mut w = Writer::new(&mut apdu_buf);
2124        ComplexAckHeader {
2125            segmented: false,
2126            more_follows: false,
2127            invoke_id,
2128            sequence_number: None,
2129            proposed_window_size: None,
2130            service_choice: SERVICE_READ_RANGE,
2131        }
2132        .encode(&mut w)
2133        .unwrap();
2134        encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
2135        encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
2136        Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
2137        w.write_u8(5).unwrap();
2138        w.write_u8(0b1110_0000).unwrap();
2139        encode_ctx_unsigned(&mut w, 4, 2).unwrap();
2140        Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
2141        encode_app_real(&mut w, 42.0).unwrap();
2142        encode_app_real(&mut w, 43.0).unwrap();
2143        Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
2144        w.as_written().to_vec()
2145    }
2146
2147    fn atomic_read_file_stream_ack_apdu(invoke_id: u8, eof: bool, data: &[u8]) -> Vec<u8> {
2148        let mut apdu_buf = [0u8; 256];
2149        let mut w = Writer::new(&mut apdu_buf);
2150        ComplexAckHeader {
2151            segmented: false,
2152            more_follows: false,
2153            invoke_id,
2154            sequence_number: None,
2155            proposed_window_size: None,
2156            service_choice: SERVICE_ATOMIC_READ_FILE,
2157        }
2158        .encode(&mut w)
2159        .unwrap();
2160        Tag::Application {
2161            tag: AppTag::Boolean,
2162            len: if eof { 1 } else { 0 },
2163        }
2164        .encode(&mut w)
2165        .unwrap();
2166        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2167        Tag::Application {
2168            tag: AppTag::SignedInt,
2169            len: 1,
2170        }
2171        .encode(&mut w)
2172        .unwrap();
2173        w.write_u8(0).unwrap();
2174        Tag::Application {
2175            tag: AppTag::OctetString,
2176            len: data.len() as u32,
2177        }
2178        .encode(&mut w)
2179        .unwrap();
2180        w.write_all(data).unwrap();
2181        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2182        w.as_written().to_vec()
2183    }
2184
2185    fn atomic_read_file_record_ack_apdu(invoke_id: u8) -> Vec<u8> {
2186        let mut apdu_buf = [0u8; 256];
2187        let mut w = Writer::new(&mut apdu_buf);
2188        ComplexAckHeader {
2189            segmented: false,
2190            more_follows: false,
2191            invoke_id,
2192            sequence_number: None,
2193            proposed_window_size: None,
2194            service_choice: SERVICE_ATOMIC_READ_FILE,
2195        }
2196        .encode(&mut w)
2197        .unwrap();
2198        Tag::Application {
2199            tag: AppTag::Boolean,
2200            len: 0,
2201        }
2202        .encode(&mut w)
2203        .unwrap();
2204        Tag::Opening { tag_num: 1 }.encode(&mut w).unwrap();
2205        Tag::Application {
2206            tag: AppTag::SignedInt,
2207            len: 1,
2208        }
2209        .encode(&mut w)
2210        .unwrap();
2211        w.write_u8(7).unwrap();
2212        Tag::Application {
2213            tag: AppTag::UnsignedInt,
2214            len: 1,
2215        }
2216        .encode(&mut w)
2217        .unwrap();
2218        w.write_u8(2).unwrap();
2219        Tag::Application {
2220            tag: AppTag::OctetString,
2221            len: 2,
2222        }
2223        .encode(&mut w)
2224        .unwrap();
2225        w.write_all(&[0x01, 0x02]).unwrap();
2226        Tag::Application {
2227            tag: AppTag::OctetString,
2228            len: 3,
2229        }
2230        .encode(&mut w)
2231        .unwrap();
2232        w.write_all(&[0x03, 0x04, 0x05]).unwrap();
2233        Tag::Closing { tag_num: 1 }.encode(&mut w).unwrap();
2234        w.as_written().to_vec()
2235    }
2236
2237    fn atomic_write_file_stream_ack_apdu(invoke_id: u8, start_position: i32) -> Vec<u8> {
2238        let mut apdu_buf = [0u8; 64];
2239        let mut w = Writer::new(&mut apdu_buf);
2240        ComplexAckHeader {
2241            segmented: false,
2242            more_follows: false,
2243            invoke_id,
2244            sequence_number: None,
2245            proposed_window_size: None,
2246            service_choice: SERVICE_ATOMIC_WRITE_FILE,
2247        }
2248        .encode(&mut w)
2249        .unwrap();
2250        Tag::Context { tag_num: 0, len: 2 }.encode(&mut w).unwrap();
2251        w.write_all(&(start_position as i16).to_be_bytes()).unwrap();
2252        w.as_written().to_vec()
2253    }
2254
2255    fn atomic_write_file_record_ack_apdu(invoke_id: u8, start_record: i32) -> Vec<u8> {
2256        let mut apdu_buf = [0u8; 64];
2257        let mut w = Writer::new(&mut apdu_buf);
2258        ComplexAckHeader {
2259            segmented: false,
2260            more_follows: false,
2261            invoke_id,
2262            sequence_number: None,
2263            proposed_window_size: None,
2264            service_choice: SERVICE_ATOMIC_WRITE_FILE,
2265        }
2266        .encode(&mut w)
2267        .unwrap();
2268        Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
2269        w.write_u8(start_record as u8).unwrap();
2270        w.as_written().to_vec()
2271    }
2272
2273    fn create_object_ack_apdu(invoke_id: u8, object_id: ObjectId) -> Vec<u8> {
2274        let mut apdu_buf = [0u8; 64];
2275        let mut w = Writer::new(&mut apdu_buf);
2276        ComplexAckHeader {
2277            segmented: false,
2278            more_follows: false,
2279            invoke_id,
2280            sequence_number: None,
2281            proposed_window_size: None,
2282            service_choice: SERVICE_CREATE_OBJECT,
2283        }
2284        .encode(&mut w)
2285        .unwrap();
2286        encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
2287        w.as_written().to_vec()
2288    }
2289
2290    fn get_alarm_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
2291        let mut apdu_buf = [0u8; 128];
2292        let mut w = Writer::new(&mut apdu_buf);
2293        ComplexAckHeader {
2294            segmented: false,
2295            more_follows: false,
2296            invoke_id,
2297            sequence_number: None,
2298            proposed_window_size: None,
2299            service_choice: SERVICE_GET_ALARM_SUMMARY,
2300        }
2301        .encode(&mut w)
2302        .unwrap();
2303        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
2304        encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2305        Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2306        w.write_u8(5).unwrap();
2307        w.write_u8(0b1110_0000).unwrap();
2308
2309        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 2).raw()).unwrap();
2310        encode_ctx_unsigned(&mut w, 1, 0).unwrap();
2311        Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2312        w.write_u8(5).unwrap();
2313        w.write_u8(0b1100_0000).unwrap();
2314        w.as_written().to_vec()
2315    }
2316
2317    fn get_enrollment_summary_ack_apdu(invoke_id: u8) -> Vec<u8> {
2318        let mut apdu_buf = [0u8; 160];
2319        let mut w = Writer::new(&mut apdu_buf);
2320        ComplexAckHeader {
2321            segmented: false,
2322            more_follows: false,
2323            invoke_id,
2324            sequence_number: None,
2325            proposed_window_size: None,
2326            service_choice: SERVICE_GET_ENROLLMENT_SUMMARY,
2327        }
2328        .encode(&mut w)
2329        .unwrap();
2330        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
2331        encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2332        encode_ctx_unsigned(&mut w, 2, 2).unwrap();
2333        encode_ctx_unsigned(&mut w, 3, 200).unwrap();
2334        encode_ctx_unsigned(&mut w, 4, 10).unwrap();
2335
2336        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::BinaryInput, 8).raw()).unwrap();
2337        encode_ctx_unsigned(&mut w, 1, 0).unwrap();
2338        encode_ctx_unsigned(&mut w, 2, 0).unwrap();
2339        encode_ctx_unsigned(&mut w, 3, 20).unwrap();
2340        encode_ctx_unsigned(&mut w, 4, 11).unwrap();
2341        w.as_written().to_vec()
2342    }
2343
2344    fn get_event_information_ack_apdu(invoke_id: u8) -> Vec<u8> {
2345        let mut apdu_buf = [0u8; 256];
2346        let mut w = Writer::new(&mut apdu_buf);
2347        ComplexAckHeader {
2348            segmented: false,
2349            more_follows: false,
2350            invoke_id,
2351            sequence_number: None,
2352            proposed_window_size: None,
2353            service_choice: SERVICE_GET_EVENT_INFORMATION,
2354        }
2355        .encode(&mut w)
2356        .unwrap();
2357        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2358        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
2359        encode_ctx_unsigned(&mut w, 1, 2).unwrap();
2360        Tag::Context { tag_num: 2, len: 2 }.encode(&mut w).unwrap();
2361        w.write_u8(5).unwrap();
2362        w.write_u8(0b1110_0000).unwrap();
2363        Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
2364        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
2365        encode_ctx_unsigned(&mut w, 1, 1).unwrap();
2366        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2367        Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
2368        encode_ctx_unsigned(&mut w, 4, 0).unwrap();
2369        Tag::Context { tag_num: 5, len: 2 }.encode(&mut w).unwrap();
2370        w.write_u8(5).unwrap();
2371        w.write_u8(0b1100_0000).unwrap();
2372        Tag::Opening { tag_num: 6 }.encode(&mut w).unwrap();
2373        encode_ctx_unsigned(&mut w, 0, 1).unwrap();
2374        encode_ctx_unsigned(&mut w, 1, 2).unwrap();
2375        encode_ctx_unsigned(&mut w, 2, 3).unwrap();
2376        Tag::Closing { tag_num: 6 }.encode(&mut w).unwrap();
2377        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
2378        Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
2379        w.write_u8(0).unwrap();
2380        w.as_written().to_vec()
2381    }
2382
2383    #[tokio::test]
2384    async fn who_has_object_name_collects_i_have() {
2385        let (dl, state) = MockDataLink::new();
2386        let client = BacnetClient::with_datalink(dl);
2387        let addr = DataLinkAddress::Ip(([192, 168, 1, 31], 47808).into());
2388
2389        let mut apdu = [0u8; 128];
2390        let mut w = Writer::new(&mut apdu);
2391        UnconfirmedRequestHeader {
2392            service_choice: SERVICE_I_HAVE,
2393        }
2394        .encode(&mut w)
2395        .unwrap();
2396        encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 10).raw()).unwrap();
2397        encode_ctx_object_id(&mut w, 1, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
2398        encode_ctx_character_string(&mut w, 2, "Zone Temp").unwrap();
2399
2400        state
2401            .recv
2402            .lock()
2403            .await
2404            .push_back((with_npdu(w.as_written()), addr));
2405
2406        let results = client
2407            .who_has_object_name(None, "Zone Temp", Duration::from_millis(10))
2408            .await
2409            .unwrap();
2410        assert_eq!(results.len(), 1);
2411        assert_eq!(results[0].address, addr);
2412        assert_eq!(results[0].device_id, ObjectId::new(ObjectType::Device, 10));
2413        assert_eq!(
2414            results[0].object_id,
2415            ObjectId::new(ObjectType::AnalogInput, 7)
2416        );
2417        assert_eq!(results[0].object_name, "Zone Temp");
2418
2419        let sent = state.sent.lock().await;
2420        assert_eq!(sent.len(), 1);
2421        let mut r = Reader::new(&sent[0].1);
2422        let _npdu = Npdu::decode(&mut r).unwrap();
2423        let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
2424        assert_eq!(hdr.service_choice, SERVICE_WHO_HAS);
2425    }
2426
2427    #[tokio::test]
2428    async fn device_communication_control_handles_simple_ack() {
2429        let (dl, state) = MockDataLink::new();
2430        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2431        let addr = DataLinkAddress::Ip(([192, 168, 1, 32], 47808).into());
2432
2433        let mut apdu = [0u8; 32];
2434        let mut w = Writer::new(&mut apdu);
2435        SimpleAck {
2436            invoke_id: 1,
2437            service_choice: SERVICE_DEVICE_COMMUNICATION_CONTROL,
2438        }
2439        .encode(&mut w)
2440        .unwrap();
2441        state
2442            .recv
2443            .lock()
2444            .await
2445            .push_back((with_npdu(w.as_written()), addr));
2446
2447        client
2448            .device_communication_control(addr, Some(30), DeviceCommunicationState::Disable, None)
2449            .await
2450            .unwrap();
2451
2452        let sent = state.sent.lock().await;
2453        assert_eq!(sent.len(), 1);
2454        let mut r = Reader::new(&sent[0].1);
2455        let _npdu = Npdu::decode(&mut r).unwrap();
2456        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2457        assert_eq!(hdr.service_choice, SERVICE_DEVICE_COMMUNICATION_CONTROL);
2458    }
2459
2460    #[tokio::test]
2461    async fn reinitialize_device_handles_simple_ack() {
2462        let (dl, state) = MockDataLink::new();
2463        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2464        let addr = DataLinkAddress::Ip(([192, 168, 1, 33], 47808).into());
2465
2466        let mut apdu = [0u8; 32];
2467        let mut w = Writer::new(&mut apdu);
2468        SimpleAck {
2469            invoke_id: 1,
2470            service_choice: SERVICE_REINITIALIZE_DEVICE,
2471        }
2472        .encode(&mut w)
2473        .unwrap();
2474        state
2475            .recv
2476            .lock()
2477            .await
2478            .push_back((with_npdu(w.as_written()), addr));
2479
2480        client
2481            .reinitialize_device(addr, ReinitializeState::ActivateChanges, Some("pw"))
2482            .await
2483            .unwrap();
2484
2485        let sent = state.sent.lock().await;
2486        assert_eq!(sent.len(), 1);
2487        let mut r = Reader::new(&sent[0].1);
2488        let _npdu = Npdu::decode(&mut r).unwrap();
2489        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2490        assert_eq!(hdr.service_choice, SERVICE_REINITIALIZE_DEVICE);
2491    }
2492
2493    #[tokio::test]
2494    async fn time_synchronize_sends_unconfirmed_request() {
2495        let (dl, state) = MockDataLink::new();
2496        let client = BacnetClient::with_datalink(dl);
2497        let addr = DataLinkAddress::Ip(([192, 168, 1, 34], 47808).into());
2498
2499        client
2500            .time_synchronize(
2501                addr,
2502                Date {
2503                    year_since_1900: 126,
2504                    month: 2,
2505                    day: 7,
2506                    weekday: 6,
2507                },
2508                Time {
2509                    hour: 10,
2510                    minute: 11,
2511                    second: 12,
2512                    hundredths: 13,
2513                },
2514                false,
2515            )
2516            .await
2517            .unwrap();
2518
2519        let sent = state.sent.lock().await;
2520        assert_eq!(sent.len(), 1);
2521        let mut r = Reader::new(&sent[0].1);
2522        let _npdu = Npdu::decode(&mut r).unwrap();
2523        let hdr = UnconfirmedRequestHeader::decode(&mut r).unwrap();
2524        assert_eq!(hdr.service_choice, SERVICE_TIME_SYNCHRONIZATION);
2525    }
2526
2527    #[tokio::test]
2528    async fn get_alarm_summary_decodes_complex_ack() {
2529        let (dl, state) = MockDataLink::new();
2530        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2531        let addr = DataLinkAddress::Ip(([192, 168, 1, 38], 47808).into());
2532
2533        state
2534            .recv
2535            .lock()
2536            .await
2537            .push_back((with_npdu(&get_alarm_summary_ack_apdu(1)), addr));
2538
2539        let summaries = client.get_alarm_summary(addr).await.unwrap();
2540        assert_eq!(summaries.len(), 2);
2541        assert_eq!(
2542            summaries[0],
2543            AlarmSummaryItem {
2544                object_id: ObjectId::new(ObjectType::AnalogInput, 1),
2545                alarm_state_raw: 1,
2546                alarm_state: Some(EventState::Fault),
2547                acknowledged_transitions: crate::ClientBitString {
2548                    unused_bits: 5,
2549                    data: vec![0b1110_0000],
2550                },
2551            }
2552        );
2553        assert_eq!(
2554            summaries[1],
2555            AlarmSummaryItem {
2556                object_id: ObjectId::new(ObjectType::BinaryInput, 2),
2557                alarm_state_raw: 0,
2558                alarm_state: Some(EventState::Normal),
2559                acknowledged_transitions: crate::ClientBitString {
2560                    unused_bits: 5,
2561                    data: vec![0b1100_0000],
2562                },
2563            }
2564        );
2565
2566        let sent = state.sent.lock().await;
2567        assert_eq!(sent.len(), 1);
2568        let mut r = Reader::new(&sent[0].1);
2569        let _npdu = Npdu::decode(&mut r).unwrap();
2570        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2571        assert_eq!(hdr.service_choice, SERVICE_GET_ALARM_SUMMARY);
2572    }
2573
2574    #[tokio::test]
2575    async fn get_enrollment_summary_decodes_complex_ack() {
2576        let (dl, state) = MockDataLink::new();
2577        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2578        let addr = DataLinkAddress::Ip(([192, 168, 1, 37], 47808).into());
2579
2580        state
2581            .recv
2582            .lock()
2583            .await
2584            .push_back((with_npdu(&get_enrollment_summary_ack_apdu(1)), addr));
2585
2586        let summaries = client.get_enrollment_summary(addr).await.unwrap();
2587        assert_eq!(summaries.len(), 2);
2588        assert_eq!(
2589            summaries[0],
2590            EnrollmentSummaryItem {
2591                object_id: ObjectId::new(ObjectType::AnalogInput, 7),
2592                event_type: 1,
2593                event_state_raw: 2,
2594                event_state: Some(EventState::Offnormal),
2595                priority: 200,
2596                notification_class: 10,
2597            }
2598        );
2599        assert_eq!(
2600            summaries[1],
2601            EnrollmentSummaryItem {
2602                object_id: ObjectId::new(ObjectType::BinaryInput, 8),
2603                event_type: 0,
2604                event_state_raw: 0,
2605                event_state: Some(EventState::Normal),
2606                priority: 20,
2607                notification_class: 11,
2608            }
2609        );
2610
2611        let sent = state.sent.lock().await;
2612        assert_eq!(sent.len(), 1);
2613        let mut r = Reader::new(&sent[0].1);
2614        let _npdu = Npdu::decode(&mut r).unwrap();
2615        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2616        assert_eq!(hdr.service_choice, SERVICE_GET_ENROLLMENT_SUMMARY);
2617    }
2618
2619    #[tokio::test]
2620    async fn get_event_information_decodes_complex_ack() {
2621        let (dl, state) = MockDataLink::new();
2622        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2623        let addr = DataLinkAddress::Ip(([192, 168, 1, 57], 47808).into());
2624
2625        state
2626            .recv
2627            .lock()
2628            .await
2629            .push_back((with_npdu(&get_event_information_ack_apdu(1)), addr));
2630
2631        let result = client.get_event_information(addr, None).await.unwrap();
2632        assert!(!result.more_events);
2633        assert_eq!(result.summaries.len(), 1);
2634        assert_eq!(
2635            result.summaries[0],
2636            EventInformationItem {
2637                object_id: ObjectId::new(ObjectType::AnalogInput, 1),
2638                event_state_raw: 2,
2639                event_state: Some(EventState::Offnormal),
2640                acknowledged_transitions: crate::ClientBitString {
2641                    unused_bits: 5,
2642                    data: vec![0b1110_0000],
2643                },
2644                notify_type: 0,
2645                event_enable: crate::ClientBitString {
2646                    unused_bits: 5,
2647                    data: vec![0b1100_0000],
2648                },
2649                event_priorities: [1, 2, 3],
2650            }
2651        );
2652    }
2653
2654    #[tokio::test]
2655    async fn acknowledge_alarm_handles_simple_ack() {
2656        let (dl, state) = MockDataLink::new();
2657        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2658        let addr = DataLinkAddress::Ip(([192, 168, 1, 39], 47808).into());
2659
2660        let mut apdu = [0u8; 32];
2661        let mut w = Writer::new(&mut apdu);
2662        SimpleAck {
2663            invoke_id: 1,
2664            service_choice: SERVICE_ACKNOWLEDGE_ALARM,
2665        }
2666        .encode(&mut w)
2667        .unwrap();
2668        state
2669            .recv
2670            .lock()
2671            .await
2672            .push_back((with_npdu(w.as_written()), addr));
2673
2674        client
2675            .acknowledge_alarm(
2676                addr,
2677                AcknowledgeAlarmRequest {
2678                    acknowledging_process_id: 10,
2679                    event_object_id: ObjectId::new(ObjectType::AnalogInput, 1),
2680                    event_state_acknowledged: EventState::Offnormal,
2681                    event_time_stamp: TimeStamp::SequenceNumber(42),
2682                    acknowledgment_source: "operator",
2683                    time_of_acknowledgment: TimeStamp::DateTime {
2684                        date: Date {
2685                            year_since_1900: 126,
2686                            month: 2,
2687                            day: 7,
2688                            weekday: 6,
2689                        },
2690                        time: Time {
2691                            hour: 10,
2692                            minute: 11,
2693                            second: 12,
2694                            hundredths: 13,
2695                        },
2696                    },
2697                    invoke_id: 0,
2698                },
2699            )
2700            .await
2701            .unwrap();
2702
2703        let sent = state.sent.lock().await;
2704        assert_eq!(sent.len(), 1);
2705        let mut r = Reader::new(&sent[0].1);
2706        let _npdu = Npdu::decode(&mut r).unwrap();
2707        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2708        assert_eq!(hdr.service_choice, SERVICE_ACKNOWLEDGE_ALARM);
2709    }
2710
2711    #[tokio::test]
2712    async fn create_object_by_type_decodes_complex_ack() {
2713        let (dl, state) = MockDataLink::new();
2714        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2715        let addr = DataLinkAddress::Ip(([192, 168, 1, 50], 47808).into());
2716        let created = ObjectId::new(ObjectType::AnalogValue, 42);
2717
2718        state
2719            .recv
2720            .lock()
2721            .await
2722            .push_back((with_npdu(&create_object_ack_apdu(1, created)), addr));
2723
2724        let result = client
2725            .create_object_by_type(addr, ObjectType::AnalogValue)
2726            .await
2727            .unwrap();
2728        assert_eq!(result, created);
2729
2730        let sent = state.sent.lock().await;
2731        let mut r = Reader::new(&sent[0].1);
2732        let _npdu = Npdu::decode(&mut r).unwrap();
2733        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2734        assert_eq!(hdr.service_choice, SERVICE_CREATE_OBJECT);
2735    }
2736
2737    #[tokio::test]
2738    async fn delete_object_handles_simple_ack() {
2739        let (dl, state) = MockDataLink::new();
2740        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2741        let addr = DataLinkAddress::Ip(([192, 168, 1, 51], 47808).into());
2742
2743        let mut apdu = [0u8; 32];
2744        let mut w = Writer::new(&mut apdu);
2745        SimpleAck {
2746            invoke_id: 1,
2747            service_choice: SERVICE_DELETE_OBJECT,
2748        }
2749        .encode(&mut w)
2750        .unwrap();
2751        state
2752            .recv
2753            .lock()
2754            .await
2755            .push_back((with_npdu(w.as_written()), addr));
2756
2757        client
2758            .delete_object(addr, ObjectId::new(ObjectType::AnalogValue, 42))
2759            .await
2760            .unwrap();
2761    }
2762
2763    #[tokio::test]
2764    async fn add_list_element_handles_simple_ack() {
2765        let (dl, state) = MockDataLink::new();
2766        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2767        let addr = DataLinkAddress::Ip(([192, 168, 1, 52], 47808).into());
2768
2769        let mut apdu = [0u8; 32];
2770        let mut w = Writer::new(&mut apdu);
2771        SimpleAck {
2772            invoke_id: 1,
2773            service_choice: SERVICE_ADD_LIST_ELEMENT,
2774        }
2775        .encode(&mut w)
2776        .unwrap();
2777        state
2778            .recv
2779            .lock()
2780            .await
2781            .push_back((with_npdu(w.as_written()), addr));
2782
2783        let values = [DataValue::Unsigned(1), DataValue::Unsigned(2)];
2784        client
2785            .add_list_element(
2786                addr,
2787                AddListElementRequest {
2788                    object_id: ObjectId::new(ObjectType::AnalogValue, 1),
2789                    property_id: PropertyId::Proprietary(512),
2790                    array_index: None,
2791                    elements: &values,
2792                    invoke_id: 0,
2793                },
2794            )
2795            .await
2796            .unwrap();
2797    }
2798
2799    #[tokio::test]
2800    async fn remove_list_element_handles_simple_ack() {
2801        let (dl, state) = MockDataLink::new();
2802        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2803        let addr = DataLinkAddress::Ip(([192, 168, 1, 53], 47808).into());
2804
2805        let mut apdu = [0u8; 32];
2806        let mut w = Writer::new(&mut apdu);
2807        SimpleAck {
2808            invoke_id: 1,
2809            service_choice: SERVICE_REMOVE_LIST_ELEMENT,
2810        }
2811        .encode(&mut w)
2812        .unwrap();
2813        state
2814            .recv
2815            .lock()
2816            .await
2817            .push_back((with_npdu(w.as_written()), addr));
2818
2819        let values = [DataValue::Unsigned(1)];
2820        client
2821            .remove_list_element(
2822                addr,
2823                RemoveListElementRequest {
2824                    object_id: ObjectId::new(ObjectType::AnalogValue, 1),
2825                    property_id: PropertyId::Proprietary(513),
2826                    array_index: None,
2827                    elements: &values,
2828                    invoke_id: 0,
2829                },
2830            )
2831            .await
2832            .unwrap();
2833    }
2834
2835    #[tokio::test]
2836    async fn atomic_read_file_stream_decodes_complex_ack() {
2837        let (dl, state) = MockDataLink::new();
2838        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2839        let addr = DataLinkAddress::Ip(([192, 168, 1, 40], 47808).into());
2840        let file_object = ObjectId::new(ObjectType::File, 2);
2841
2842        state.recv.lock().await.push_back((
2843            with_npdu(&atomic_read_file_stream_ack_apdu(
2844                1,
2845                true,
2846                &[0xAA, 0xBB, 0xCC],
2847            )),
2848            addr,
2849        ));
2850
2851        let result = client
2852            .atomic_read_file_stream(addr, file_object, 0, 3)
2853            .await
2854            .unwrap();
2855
2856        assert_eq!(
2857            result,
2858            AtomicReadFileResult::Stream {
2859                end_of_file: true,
2860                file_start_position: 0,
2861                file_data: vec![0xAA, 0xBB, 0xCC],
2862            }
2863        );
2864
2865        let sent = state.sent.lock().await;
2866        assert_eq!(sent.len(), 1);
2867        let mut r = Reader::new(&sent[0].1);
2868        let _npdu = Npdu::decode(&mut r).unwrap();
2869        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
2870        assert_eq!(hdr.service_choice, SERVICE_ATOMIC_READ_FILE);
2871    }
2872
2873    #[tokio::test]
2874    async fn atomic_read_file_record_decodes_complex_ack() {
2875        let (dl, state) = MockDataLink::new();
2876        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2877        let addr = DataLinkAddress::Ip(([192, 168, 1, 41], 47808).into());
2878        let file_object = ObjectId::new(ObjectType::File, 5);
2879
2880        state
2881            .recv
2882            .lock()
2883            .await
2884            .push_back((with_npdu(&atomic_read_file_record_ack_apdu(1)), addr));
2885
2886        let result = client
2887            .atomic_read_file_record(addr, file_object, 7, 2)
2888            .await
2889            .unwrap();
2890
2891        assert_eq!(
2892            result,
2893            AtomicReadFileResult::Record {
2894                end_of_file: false,
2895                file_start_record: 7,
2896                returned_record_count: 2,
2897                file_record_data: vec![vec![0x01, 0x02], vec![0x03, 0x04, 0x05]],
2898            }
2899        );
2900    }
2901
2902    #[tokio::test]
2903    async fn atomic_write_file_stream_decodes_complex_ack() {
2904        let (dl, state) = MockDataLink::new();
2905        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2906        let addr = DataLinkAddress::Ip(([192, 168, 1, 42], 47808).into());
2907        let file_object = ObjectId::new(ObjectType::File, 3);
2908
2909        state
2910            .recv
2911            .lock()
2912            .await
2913            .push_back((with_npdu(&atomic_write_file_stream_ack_apdu(1, 128)), addr));
2914
2915        let result = client
2916            .atomic_write_file_stream(addr, file_object, 128, &[1, 2, 3, 4])
2917            .await
2918            .unwrap();
2919
2920        assert_eq!(
2921            result,
2922            AtomicWriteFileResult::Stream {
2923                file_start_position: 128
2924            }
2925        );
2926    }
2927
2928    #[tokio::test]
2929    async fn atomic_write_file_record_decodes_complex_ack() {
2930        let (dl, state) = MockDataLink::new();
2931        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
2932        let addr = DataLinkAddress::Ip(([192, 168, 1, 43], 47808).into());
2933        let file_object = ObjectId::new(ObjectType::File, 9);
2934        let records: [&[u8]; 2] = [&[0x10, 0x11], &[0x12]];
2935
2936        state
2937            .recv
2938            .lock()
2939            .await
2940            .push_back((with_npdu(&atomic_write_file_record_ack_apdu(1, 7)), addr));
2941
2942        let result = client
2943            .atomic_write_file_record(addr, file_object, 7, &records)
2944            .await
2945            .unwrap();
2946
2947        assert_eq!(
2948            result,
2949            AtomicWriteFileResult::Record {
2950                file_start_record: 7
2951            }
2952        );
2953    }
2954
2955    #[tokio::test]
2956    async fn read_properties_decodes_complex_ack() {
2957        let (dl, state) = MockDataLink::new();
2958        let client = BacnetClient::with_datalink(dl);
2959        let addr = DataLinkAddress::Ip(([192, 168, 1, 5], 47808).into());
2960        let object_id = ObjectId::new(ObjectType::Device, 1);
2961
2962        let mut apdu_buf = [0u8; 256];
2963        let mut w = Writer::new(&mut apdu_buf);
2964        ComplexAckHeader {
2965            segmented: false,
2966            more_follows: false,
2967            invoke_id: 1,
2968            sequence_number: None,
2969            proposed_window_size: None,
2970            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
2971        }
2972        .encode(&mut w)
2973        .unwrap();
2974        encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
2975        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
2976            .encode(&mut w)
2977            .unwrap();
2978        encode_ctx_unsigned(&mut w, 2, PropertyId::PresentValue.to_u32()).unwrap();
2979        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
2980            .encode(&mut w)
2981            .unwrap();
2982        encode_app_real(&mut w, 55.5).unwrap();
2983        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
2984            .encode(&mut w)
2985            .unwrap();
2986        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
2987            .encode(&mut w)
2988            .unwrap();
2989
2990        state
2991            .recv
2992            .lock()
2993            .await
2994            .push_back((with_npdu(w.as_written()), addr));
2995
2996        let values = client
2997            .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
2998            .await
2999            .unwrap();
3000        assert_eq!(values.len(), 1);
3001        assert_eq!(values[0].0, PropertyId::PresentValue);
3002        assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 55.5).abs() < f32::EPSILON));
3003
3004        let sent = state.sent.lock().await;
3005        assert_eq!(sent.len(), 1);
3006        let mut r = Reader::new(&sent[0].1);
3007        let _npdu = Npdu::decode(&mut r).unwrap();
3008        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3009        assert_eq!(hdr.service_choice, SERVICE_READ_PROPERTY_MULTIPLE);
3010    }
3011
3012    #[tokio::test]
3013    async fn read_property_multiple_reassembles_segmented_complex_ack() {
3014        let (dl, state) = MockDataLink::new();
3015        let client = BacnetClient::with_datalink(dl);
3016        let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
3017        let object_id = ObjectId::new(ObjectType::Device, 1);
3018
3019        let mut payload_buf = [0u8; 256];
3020        let mut pw = Writer::new(&mut payload_buf);
3021        encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
3022        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
3023            .encode(&mut pw)
3024            .unwrap();
3025        encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
3026        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
3027            .encode(&mut pw)
3028            .unwrap();
3029        encode_app_real(&mut pw, 66.0).unwrap();
3030        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
3031            .encode(&mut pw)
3032            .unwrap();
3033        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
3034            .encode(&mut pw)
3035            .unwrap();
3036        let payload = pw.as_written();
3037        let split = payload.len() / 2;
3038
3039        let mut apdu1 = [0u8; 256];
3040        let mut w1 = Writer::new(&mut apdu1);
3041        ComplexAckHeader {
3042            segmented: true,
3043            more_follows: true,
3044            invoke_id: 1,
3045            sequence_number: Some(0),
3046            proposed_window_size: Some(1),
3047            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3048        }
3049        .encode(&mut w1)
3050        .unwrap();
3051        w1.write_all(&payload[..split]).unwrap();
3052
3053        let mut apdu2 = [0u8; 256];
3054        let mut w2 = Writer::new(&mut apdu2);
3055        ComplexAckHeader {
3056            segmented: true,
3057            more_follows: false,
3058            invoke_id: 1,
3059            sequence_number: Some(1),
3060            proposed_window_size: Some(1),
3061            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3062        }
3063        .encode(&mut w2)
3064        .unwrap();
3065        w2.write_all(&payload[split..]).unwrap();
3066
3067        state
3068            .recv
3069            .lock()
3070            .await
3071            .push_back((with_npdu(w1.as_written()), addr));
3072        state
3073            .recv
3074            .lock()
3075            .await
3076            .push_back((with_npdu(w2.as_written()), addr));
3077
3078        let values = client
3079            .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
3080            .await
3081            .unwrap();
3082        assert_eq!(values.len(), 1);
3083        assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
3084
3085        let sent = state.sent.lock().await;
3086        assert!(sent.len() >= 3);
3087
3088        let mut saw_segment_ack = 0usize;
3089        for (_, frame) in sent.iter().skip(1) {
3090            let mut r = Reader::new(frame);
3091            let _npdu = Npdu::decode(&mut r).unwrap();
3092            let apdu = r.read_exact(r.remaining()).unwrap();
3093            if (apdu[0] >> 4) == ApduType::SegmentAck as u8 {
3094                let mut sr = Reader::new(apdu);
3095                let sack = SegmentAck::decode(&mut sr).unwrap();
3096                assert_eq!(sack.invoke_id, 1);
3097                saw_segment_ack += 1;
3098            }
3099        }
3100        assert!(saw_segment_ack >= 1);
3101    }
3102
3103    #[tokio::test]
3104    async fn read_property_multiple_tolerates_duplicate_segment() {
3105        let (dl, state) = MockDataLink::new();
3106        let client = BacnetClient::with_datalink(dl);
3107        let addr = DataLinkAddress::Ip(([192, 168, 1, 18], 47808).into());
3108        let object_id = ObjectId::new(ObjectType::Device, 1);
3109
3110        let mut payload_buf = [0u8; 256];
3111        let mut pw = Writer::new(&mut payload_buf);
3112        encode_ctx_unsigned(&mut pw, 0, object_id.raw()).unwrap();
3113        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
3114            .encode(&mut pw)
3115            .unwrap();
3116        encode_ctx_unsigned(&mut pw, 2, PropertyId::PresentValue.to_u32()).unwrap();
3117        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
3118            .encode(&mut pw)
3119            .unwrap();
3120        encode_app_real(&mut pw, 66.0).unwrap();
3121        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
3122            .encode(&mut pw)
3123            .unwrap();
3124        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
3125            .encode(&mut pw)
3126            .unwrap();
3127        let payload = pw.as_written();
3128        let split = payload.len() / 2;
3129
3130        let mut apdu1 = [0u8; 256];
3131        let mut w1 = Writer::new(&mut apdu1);
3132        ComplexAckHeader {
3133            segmented: true,
3134            more_follows: true,
3135            invoke_id: 1,
3136            sequence_number: Some(0),
3137            proposed_window_size: Some(1),
3138            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3139        }
3140        .encode(&mut w1)
3141        .unwrap();
3142        w1.write_all(&payload[..split]).unwrap();
3143
3144        let mut dup = [0u8; 256];
3145        let mut wd = Writer::new(&mut dup);
3146        ComplexAckHeader {
3147            segmented: true,
3148            more_follows: true,
3149            invoke_id: 1,
3150            sequence_number: Some(0),
3151            proposed_window_size: Some(1),
3152            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3153        }
3154        .encode(&mut wd)
3155        .unwrap();
3156        wd.write_all(&payload[..split]).unwrap();
3157
3158        let mut apdu2 = [0u8; 256];
3159        let mut w2 = Writer::new(&mut apdu2);
3160        ComplexAckHeader {
3161            segmented: true,
3162            more_follows: false,
3163            invoke_id: 1,
3164            sequence_number: Some(1),
3165            proposed_window_size: Some(1),
3166            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
3167        }
3168        .encode(&mut w2)
3169        .unwrap();
3170        w2.write_all(&payload[split..]).unwrap();
3171
3172        {
3173            let mut recv = state.recv.lock().await;
3174            recv.push_back((with_npdu(w1.as_written()), addr));
3175            recv.push_back((with_npdu(wd.as_written()), addr));
3176            recv.push_back((with_npdu(w2.as_written()), addr));
3177        }
3178
3179        let values = client
3180            .read_property_multiple(addr, object_id, &[PropertyId::PresentValue])
3181            .await
3182            .unwrap();
3183        assert_eq!(values.len(), 1);
3184        assert!(matches!(values[0].1, ClientDataValue::Real(v) if (v - 66.0).abs() < f32::EPSILON));
3185    }
3186
3187    #[tokio::test]
3188    async fn write_properties_handles_simple_ack() {
3189        let (dl, state) = MockDataLink::new();
3190        let client = BacnetClient::with_datalink(dl);
3191        let addr = DataLinkAddress::Ip(([192, 168, 1, 6], 47808).into());
3192        let object_id = ObjectId::new(ObjectType::AnalogOutput, 2);
3193
3194        let mut apdu_buf = [0u8; 32];
3195        let mut w = Writer::new(&mut apdu_buf);
3196        SimpleAck {
3197            invoke_id: 1,
3198            service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
3199        }
3200        .encode(&mut w)
3201        .unwrap();
3202        state
3203            .recv
3204            .lock()
3205            .await
3206            .push_back((with_npdu(w.as_written()), addr));
3207
3208        let writes = [PropertyWriteSpec {
3209            property_id: PropertyId::PresentValue,
3210            array_index: None,
3211            value: DataValue::Real(12.5),
3212            priority: Some(8),
3213        }];
3214        client
3215            .write_property_multiple(addr, object_id, &writes)
3216            .await
3217            .unwrap();
3218
3219        let sent = state.sent.lock().await;
3220        assert_eq!(sent.len(), 1);
3221        let mut r = Reader::new(&sent[0].1);
3222        let _npdu = Npdu::decode(&mut r).unwrap();
3223        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3224        assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
3225    }
3226
3227    #[tokio::test]
3228    async fn subscribe_cov_handles_simple_ack() {
3229        let (dl, state) = MockDataLink::new();
3230        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3231        let addr = DataLinkAddress::Ip(([192, 168, 1, 11], 47808).into());
3232
3233        let mut apdu_buf = [0u8; 32];
3234        let mut w = Writer::new(&mut apdu_buf);
3235        SimpleAck {
3236            invoke_id: 1,
3237            service_choice: SERVICE_SUBSCRIBE_COV,
3238        }
3239        .encode(&mut w)
3240        .unwrap();
3241        state
3242            .recv
3243            .lock()
3244            .await
3245            .push_back((with_npdu(w.as_written()), addr));
3246
3247        client
3248            .subscribe_cov(
3249                addr,
3250                SubscribeCovRequest {
3251                    subscriber_process_id: 10,
3252                    monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
3253                    issue_confirmed_notifications: Some(false),
3254                    lifetime_seconds: Some(300),
3255                    invoke_id: 0,
3256                },
3257            )
3258            .await
3259            .unwrap();
3260
3261        let sent = state.sent.lock().await;
3262        assert_eq!(sent.len(), 1);
3263        let mut r = Reader::new(&sent[0].1);
3264        let _npdu = Npdu::decode(&mut r).unwrap();
3265        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3266        assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV);
3267    }
3268
3269    #[tokio::test]
3270    async fn subscribe_cov_property_handles_simple_ack() {
3271        let (dl, state) = MockDataLink::new();
3272        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3273        let addr = DataLinkAddress::Ip(([192, 168, 1, 21], 47808).into());
3274
3275        let mut apdu_buf = [0u8; 32];
3276        let mut w = Writer::new(&mut apdu_buf);
3277        SimpleAck {
3278            invoke_id: 1,
3279            service_choice: SERVICE_SUBSCRIBE_COV_PROPERTY,
3280        }
3281        .encode(&mut w)
3282        .unwrap();
3283        state
3284            .recv
3285            .lock()
3286            .await
3287            .push_back((with_npdu(w.as_written()), addr));
3288
3289        client
3290            .subscribe_cov_property(
3291                addr,
3292                SubscribeCovPropertyRequest {
3293                    subscriber_process_id: 22,
3294                    monitored_object_id: ObjectId::new(ObjectType::AnalogInput, 3),
3295                    issue_confirmed_notifications: Some(true),
3296                    lifetime_seconds: Some(120),
3297                    monitored_property_id: PropertyId::PresentValue,
3298                    monitored_property_array_index: None,
3299                    cov_increment: Some(0.1),
3300                    invoke_id: 0,
3301                },
3302            )
3303            .await
3304            .unwrap();
3305
3306        let sent = state.sent.lock().await;
3307        assert_eq!(sent.len(), 1);
3308        let mut r = Reader::new(&sent[0].1);
3309        let _npdu = Npdu::decode(&mut r).unwrap();
3310        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3311        assert_eq!(hdr.service_choice, SERVICE_SUBSCRIBE_COV_PROPERTY);
3312    }
3313
3314    #[tokio::test]
3315    async fn read_range_by_position_decodes_complex_ack() {
3316        let (dl, state) = MockDataLink::new();
3317        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3318        let addr = DataLinkAddress::Ip(([192, 168, 1, 22], 47808).into());
3319        let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3320
3321        let mut apdu_buf = [0u8; 256];
3322        let mut w = Writer::new(&mut apdu_buf);
3323        ComplexAckHeader {
3324            segmented: false,
3325            more_follows: false,
3326            invoke_id: 1,
3327            sequence_number: None,
3328            proposed_window_size: None,
3329            service_choice: SERVICE_READ_RANGE,
3330        }
3331        .encode(&mut w)
3332        .unwrap();
3333        encode_ctx_object_id(&mut w, 0, object_id.raw()).unwrap();
3334        encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
3335        Tag::Context { tag_num: 3, len: 2 }.encode(&mut w).unwrap();
3336        w.write_u8(5).unwrap();
3337        w.write_u8(0b1110_0000).unwrap();
3338        encode_ctx_unsigned(&mut w, 4, 2).unwrap();
3339        Tag::Opening { tag_num: 5 }.encode(&mut w).unwrap();
3340        encode_app_real(&mut w, 42.0).unwrap();
3341        encode_app_real(&mut w, 43.0).unwrap();
3342        Tag::Closing { tag_num: 5 }.encode(&mut w).unwrap();
3343
3344        state
3345            .recv
3346            .lock()
3347            .await
3348            .push_back((with_npdu(w.as_written()), addr));
3349
3350        let result = client
3351            .read_range_by_position(addr, object_id, PropertyId::PresentValue, None, 1, 2)
3352            .await
3353            .unwrap();
3354        assert_eq!(result.object_id, object_id);
3355        assert_eq!(result.item_count, 2);
3356        assert_eq!(result.items.len(), 2);
3357        assert!(matches!(
3358            result.items[0],
3359            ClientDataValue::Real(v) if (v - 42.0).abs() < f32::EPSILON
3360        ));
3361    }
3362
3363    #[tokio::test]
3364    async fn read_range_by_sequence_number_encodes_range_selector() {
3365        let (dl, state) = MockDataLink::new();
3366        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3367        let addr = DataLinkAddress::Ip(([192, 168, 1, 35], 47808).into());
3368        let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3369
3370        state
3371            .recv
3372            .lock()
3373            .await
3374            .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
3375
3376        let _ = client
3377            .read_range_by_sequence_number(addr, object_id, PropertyId::PresentValue, None, 20, 2)
3378            .await
3379            .unwrap();
3380
3381        let sent = state.sent.lock().await;
3382        let mut r = Reader::new(&sent[0].1);
3383        let _npdu = Npdu::decode(&mut r).unwrap();
3384        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3385        assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
3386        match Tag::decode(&mut r).unwrap() {
3387            Tag::Context { tag_num: 0, len: 4 } => {
3388                let _ = r.read_exact(4).unwrap();
3389            }
3390            other => panic!("unexpected object id tag: {other:?}"),
3391        }
3392        match Tag::decode(&mut r).unwrap() {
3393            Tag::Context { tag_num: 1, len } => {
3394                let _ = decode_unsigned(&mut r, len as usize).unwrap();
3395            }
3396            other => panic!("unexpected property tag: {other:?}"),
3397        }
3398        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 6 });
3399        match Tag::decode(&mut r).unwrap() {
3400            Tag::Application {
3401                tag: AppTag::UnsignedInt,
3402                len,
3403            } => {
3404                assert_eq!(decode_unsigned(&mut r, len as usize).unwrap(), 20);
3405            }
3406            other => panic!("unexpected ref seq tag: {other:?}"),
3407        }
3408        match Tag::decode(&mut r).unwrap() {
3409            Tag::Application {
3410                tag: AppTag::SignedInt,
3411                len,
3412            } => {
3413                assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
3414            }
3415            other => panic!("unexpected count tag: {other:?}"),
3416        }
3417        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 6 });
3418    }
3419
3420    #[tokio::test]
3421    async fn read_range_by_time_encodes_range_selector() {
3422        let (dl, state) = MockDataLink::new();
3423        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3424        let addr = DataLinkAddress::Ip(([192, 168, 1, 36], 47808).into());
3425        let object_id = ObjectId::new(ObjectType::TrendLog, 1);
3426        let date = Date {
3427            year_since_1900: 126,
3428            month: 2,
3429            day: 7,
3430            weekday: 6,
3431        };
3432        let time = Time {
3433            hour: 10,
3434            minute: 11,
3435            second: 12,
3436            hundredths: 13,
3437        };
3438
3439        state
3440            .recv
3441            .lock()
3442            .await
3443            .push_back((with_npdu(&read_range_ack_apdu(1, object_id)), addr));
3444
3445        let _ = client
3446            .read_range_by_time(
3447                addr,
3448                object_id,
3449                PropertyId::PresentValue,
3450                None,
3451                (date, time),
3452                2,
3453            )
3454            .await
3455            .unwrap();
3456
3457        let sent = state.sent.lock().await;
3458        let mut r = Reader::new(&sent[0].1);
3459        let _npdu = Npdu::decode(&mut r).unwrap();
3460        let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3461        assert_eq!(hdr.service_choice, SERVICE_READ_RANGE);
3462        match Tag::decode(&mut r).unwrap() {
3463            Tag::Context { tag_num: 0, len: 4 } => {
3464                let _ = r.read_exact(4).unwrap();
3465            }
3466            other => panic!("unexpected object id tag: {other:?}"),
3467        }
3468        match Tag::decode(&mut r).unwrap() {
3469            Tag::Context { tag_num: 1, len } => {
3470                let _ = decode_unsigned(&mut r, len as usize).unwrap();
3471            }
3472            other => panic!("unexpected property tag: {other:?}"),
3473        }
3474        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Opening { tag_num: 7 });
3475        match Tag::decode(&mut r).unwrap() {
3476            Tag::Application {
3477                tag: AppTag::Date,
3478                len: 4,
3479            } => {
3480                let raw = r.read_exact(4).unwrap();
3481                assert_eq!(
3482                    raw,
3483                    &[date.year_since_1900, date.month, date.day, date.weekday]
3484                );
3485            }
3486            other => panic!("unexpected date tag: {other:?}"),
3487        }
3488        match Tag::decode(&mut r).unwrap() {
3489            Tag::Application {
3490                tag: AppTag::Time,
3491                len: 4,
3492            } => {
3493                let raw = r.read_exact(4).unwrap();
3494                assert_eq!(raw, &[time.hour, time.minute, time.second, time.hundredths]);
3495            }
3496            other => panic!("unexpected time tag: {other:?}"),
3497        }
3498        match Tag::decode(&mut r).unwrap() {
3499            Tag::Application {
3500                tag: AppTag::SignedInt,
3501                len,
3502            } => {
3503                assert_eq!(decode_signed(&mut r, len as usize).unwrap(), 2);
3504            }
3505            other => panic!("unexpected count tag: {other:?}"),
3506        }
3507        assert_eq!(Tag::decode(&mut r).unwrap(), Tag::Closing { tag_num: 7 });
3508    }
3509
3510    #[tokio::test]
3511    async fn recv_unconfirmed_cov_notification_returns_decoded_value() {
3512        let (dl, state) = MockDataLink::new();
3513        let client = BacnetClient::with_datalink(dl);
3514        let addr = DataLinkAddress::Ip(([192, 168, 1, 12], 47808).into());
3515
3516        let mut apdu = [0u8; 256];
3517        let mut w = Writer::new(&mut apdu);
3518        UnconfirmedRequestHeader {
3519            service_choice: SERVICE_UNCONFIRMED_COV_NOTIFICATION,
3520        }
3521        .encode(&mut w)
3522        .unwrap();
3523        encode_ctx_unsigned(&mut w, 0, 17).unwrap();
3524        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3525        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 1).raw()).unwrap();
3526        encode_ctx_unsigned(&mut w, 3, 60).unwrap();
3527        Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
3528        encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
3529        Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
3530        encode_app_real(&mut w, 73.25).unwrap();
3531        Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
3532        Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
3533
3534        state
3535            .recv
3536            .lock()
3537            .await
3538            .push_back((with_npdu(w.as_written()), addr));
3539
3540        let notification = client
3541            .recv_cov_notification(Duration::from_secs(1))
3542            .await
3543            .unwrap()
3544            .unwrap();
3545        assert!(!notification.confirmed);
3546        assert_eq!(notification.subscriber_process_id, 17);
3547        assert_eq!(notification.values.len(), 1);
3548        assert_eq!(notification.values[0].property_id, PropertyId::PresentValue);
3549        assert!(matches!(
3550            notification.values[0].value,
3551            ClientDataValue::Real(v) if (v - 73.25).abs() < f32::EPSILON
3552        ));
3553
3554        let sent = state.sent.lock().await;
3555        assert!(sent.is_empty());
3556    }
3557
3558    #[tokio::test]
3559    async fn recv_confirmed_cov_notification_sends_simple_ack() {
3560        let (dl, state) = MockDataLink::new();
3561        let client = BacnetClient::with_datalink(dl);
3562        let addr = DataLinkAddress::Ip(([192, 168, 1, 13], 47808).into());
3563
3564        let mut apdu = [0u8; 256];
3565        let mut w = Writer::new(&mut apdu);
3566        ConfirmedRequestHeader {
3567            segmented: false,
3568            more_follows: false,
3569            segmented_response_accepted: false,
3570            max_segments: 0,
3571            max_apdu: 5,
3572            invoke_id: 9,
3573            sequence_number: None,
3574            proposed_window_size: None,
3575            service_choice: SERVICE_CONFIRMED_COV_NOTIFICATION,
3576        }
3577        .encode(&mut w)
3578        .unwrap();
3579        encode_ctx_unsigned(&mut w, 0, 18).unwrap();
3580        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3581        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 2).raw()).unwrap();
3582        encode_ctx_unsigned(&mut w, 3, 120).unwrap();
3583        Tag::Opening { tag_num: 4 }.encode(&mut w).unwrap();
3584        encode_ctx_unsigned(&mut w, 0, PropertyId::PresentValue.to_u32()).unwrap();
3585        Tag::Opening { tag_num: 2 }.encode(&mut w).unwrap();
3586        encode_app_real(&mut w, 55.0).unwrap();
3587        Tag::Closing { tag_num: 2 }.encode(&mut w).unwrap();
3588        Tag::Closing { tag_num: 4 }.encode(&mut w).unwrap();
3589
3590        state
3591            .recv
3592            .lock()
3593            .await
3594            .push_back((with_npdu(w.as_written()), addr));
3595
3596        let notification = client
3597            .recv_cov_notification(Duration::from_secs(1))
3598            .await
3599            .unwrap()
3600            .unwrap();
3601        assert!(notification.confirmed);
3602        assert_eq!(notification.values.len(), 1);
3603
3604        let sent = state.sent.lock().await;
3605        assert_eq!(sent.len(), 1);
3606        let mut r = Reader::new(&sent[0].1);
3607        let _npdu = Npdu::decode(&mut r).unwrap();
3608        let ack = SimpleAck::decode(&mut r).unwrap();
3609        assert_eq!(ack.invoke_id, 9);
3610        assert_eq!(ack.service_choice, SERVICE_CONFIRMED_COV_NOTIFICATION);
3611    }
3612
3613    #[tokio::test]
3614    async fn recv_unconfirmed_event_notification_returns_decoded_value() {
3615        let (dl, state) = MockDataLink::new();
3616        let client = BacnetClient::with_datalink(dl);
3617        let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
3618
3619        let mut apdu = [0u8; 256];
3620        let mut w = Writer::new(&mut apdu);
3621        UnconfirmedRequestHeader {
3622            service_choice: SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
3623        }
3624        .encode(&mut w)
3625        .unwrap();
3626        encode_ctx_unsigned(&mut w, 0, 99).unwrap();
3627        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3628        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 6).raw()).unwrap();
3629        Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
3630        encode_ctx_unsigned(&mut w, 1, 55).unwrap();
3631        Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
3632        encode_ctx_unsigned(&mut w, 4, 7).unwrap();
3633        encode_ctx_unsigned(&mut w, 5, 100).unwrap();
3634        encode_ctx_unsigned(&mut w, 6, 2).unwrap();
3635        encode_ctx_character_string(&mut w, 7, "fan alarm").unwrap();
3636        encode_ctx_unsigned(&mut w, 8, 0).unwrap();
3637        Tag::Context { tag_num: 9, len: 1 }.encode(&mut w).unwrap();
3638        encode_ctx_unsigned(&mut w, 10, 2).unwrap();
3639        encode_ctx_unsigned(&mut w, 11, 0).unwrap();
3640        Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
3641        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3642        encode_ctx_unsigned(&mut w, 0, 1).unwrap();
3643        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3644        Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
3645
3646        state
3647            .recv
3648            .lock()
3649            .await
3650            .push_back((with_npdu(w.as_written()), addr));
3651
3652        let notification: EventNotification = client
3653            .recv_event_notification(Duration::from_secs(1))
3654            .await
3655            .unwrap()
3656            .unwrap();
3657        assert!(!notification.confirmed);
3658        assert_eq!(notification.process_id, 99);
3659        assert_eq!(notification.message_text.as_deref(), Some("fan alarm"));
3660        assert_eq!(notification.ack_required, Some(true));
3661        assert_eq!(notification.from_state, Some(EventState::Offnormal));
3662        assert_eq!(notification.to_state, Some(EventState::Normal));
3663        assert_eq!(notification.notify_type, 0);
3664
3665        let sent = state.sent.lock().await;
3666        assert!(sent.is_empty());
3667    }
3668
3669    #[tokio::test]
3670    async fn recv_confirmed_event_notification_sends_simple_ack() {
3671        let (dl, state) = MockDataLink::new();
3672        let client = BacnetClient::with_datalink(dl);
3673        let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
3674
3675        let mut apdu = [0u8; 256];
3676        let mut w = Writer::new(&mut apdu);
3677        ConfirmedRequestHeader {
3678            segmented: false,
3679            more_follows: false,
3680            segmented_response_accepted: false,
3681            max_segments: 0,
3682            max_apdu: 5,
3683            invoke_id: 11,
3684            sequence_number: None,
3685            proposed_window_size: None,
3686            service_choice: SERVICE_CONFIRMED_EVENT_NOTIFICATION,
3687        }
3688        .encode(&mut w)
3689        .unwrap();
3690        encode_ctx_unsigned(&mut w, 0, 100).unwrap();
3691        encode_ctx_unsigned(&mut w, 1, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
3692        encode_ctx_unsigned(&mut w, 2, ObjectId::new(ObjectType::AnalogInput, 7).raw()).unwrap();
3693        Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
3694        encode_ctx_unsigned(&mut w, 1, 56).unwrap();
3695        Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
3696        encode_ctx_unsigned(&mut w, 4, 7).unwrap();
3697        encode_ctx_unsigned(&mut w, 5, 100).unwrap();
3698        encode_ctx_unsigned(&mut w, 6, 2).unwrap();
3699        encode_ctx_unsigned(&mut w, 8, 0).unwrap();
3700        encode_ctx_unsigned(&mut w, 10, 2).unwrap();
3701        encode_ctx_unsigned(&mut w, 11, 0).unwrap();
3702        Tag::Opening { tag_num: 12 }.encode(&mut w).unwrap();
3703        Tag::Opening { tag_num: 0 }.encode(&mut w).unwrap();
3704        encode_ctx_unsigned(&mut w, 0, 1).unwrap();
3705        Tag::Closing { tag_num: 0 }.encode(&mut w).unwrap();
3706        Tag::Closing { tag_num: 12 }.encode(&mut w).unwrap();
3707
3708        state
3709            .recv
3710            .lock()
3711            .await
3712            .push_back((with_npdu(w.as_written()), addr));
3713
3714        let notification = client
3715            .recv_event_notification(Duration::from_secs(1))
3716            .await
3717            .unwrap()
3718            .unwrap();
3719        assert!(notification.confirmed);
3720
3721        let sent = state.sent.lock().await;
3722        assert_eq!(sent.len(), 1);
3723        let mut r = Reader::new(&sent[0].1);
3724        let _npdu = Npdu::decode(&mut r).unwrap();
3725        let ack = SimpleAck::decode(&mut r).unwrap();
3726        assert_eq!(ack.invoke_id, 11);
3727        assert_eq!(ack.service_choice, SERVICE_CONFIRMED_EVENT_NOTIFICATION);
3728    }
3729
3730    #[tokio::test]
3731    async fn write_property_multiple_segments_large_request() {
3732        let (dl, state) = MockDataLink::new();
3733        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
3734        let addr = DataLinkAddress::Ip(([192, 168, 1, 10], 47808).into());
3735        let object_id = ObjectId::new(ObjectType::AnalogOutput, 5);
3736
3737        {
3738            let mut recv = state.recv.lock().await;
3739            for seq in 0u8..=254 {
3740                let mut apdu = [0u8; 16];
3741                let mut w = Writer::new(&mut apdu);
3742                SegmentAck {
3743                    negative_ack: false,
3744                    sent_by_server: true,
3745                    invoke_id: 1,
3746                    sequence_number: seq,
3747                    actual_window_size: 1,
3748                }
3749                .encode(&mut w)
3750                .unwrap();
3751                recv.push_back((with_npdu(w.as_written()), addr));
3752            }
3753
3754            let mut apdu = [0u8; 16];
3755            let mut w = Writer::new(&mut apdu);
3756            SimpleAck {
3757                invoke_id: 1,
3758                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
3759            }
3760            .encode(&mut w)
3761            .unwrap();
3762            recv.push_back((with_npdu(w.as_written()), addr));
3763        }
3764
3765        let writes: Vec<PropertyWriteSpec> = (0..180)
3766            .map(|_| PropertyWriteSpec {
3767                property_id: PropertyId::Description,
3768                array_index: None,
3769                value: DataValue::CharacterString(
3770                    "rustbac segmented write test payload................................................................",
3771                ),
3772                priority: None,
3773            })
3774            .collect();
3775
3776        client
3777            .write_property_multiple(addr, object_id, &writes)
3778            .await
3779            .unwrap();
3780
3781        let sent = state.sent.lock().await;
3782        assert!(sent.len() > 1);
3783
3784        let mut seqs = Vec::new();
3785        let mut saw_more_follows = false;
3786        let mut saw_last = false;
3787        for (_, frame) in sent.iter() {
3788            let mut r = Reader::new(frame);
3789            let _npdu = Npdu::decode(&mut r).unwrap();
3790            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3791            assert!(hdr.segmented);
3792            assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
3793            if hdr.more_follows {
3794                saw_more_follows = true;
3795            } else {
3796                saw_last = true;
3797            }
3798            seqs.push(hdr.sequence_number.unwrap());
3799        }
3800
3801        assert!(saw_more_follows);
3802        assert!(saw_last);
3803        for (idx, seq) in seqs.iter().enumerate() {
3804            assert_eq!(*seq as usize, idx);
3805        }
3806    }
3807
3808    #[tokio::test]
3809    async fn write_property_multiple_uses_configured_segment_window() {
3810        let (dl, state) = MockDataLink::new();
3811        let client = BacnetClient::with_datalink(dl)
3812            .with_response_timeout(Duration::from_secs(1))
3813            .with_segmented_request_window_size(4);
3814        let addr = DataLinkAddress::Ip(([192, 168, 1, 14], 47808).into());
3815        let object_id = ObjectId::new(ObjectType::AnalogOutput, 6);
3816
3817        {
3818            let mut recv = state.recv.lock().await;
3819            for seq in 0u8..=254 {
3820                let mut apdu = [0u8; 16];
3821                let mut w = Writer::new(&mut apdu);
3822                SegmentAck {
3823                    negative_ack: false,
3824                    sent_by_server: true,
3825                    invoke_id: 1,
3826                    sequence_number: seq,
3827                    actual_window_size: 4,
3828                }
3829                .encode(&mut w)
3830                .unwrap();
3831                recv.push_back((with_npdu(w.as_written()), addr));
3832            }
3833
3834            let mut apdu = [0u8; 16];
3835            let mut w = Writer::new(&mut apdu);
3836            SimpleAck {
3837                invoke_id: 1,
3838                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
3839            }
3840            .encode(&mut w)
3841            .unwrap();
3842            recv.push_back((with_npdu(w.as_written()), addr));
3843        }
3844
3845        let writes: Vec<PropertyWriteSpec> = (0..180)
3846            .map(|_| PropertyWriteSpec {
3847                property_id: PropertyId::Description,
3848                array_index: None,
3849                value: DataValue::CharacterString(
3850                    "rustbac segmented write test payload................................................................",
3851                ),
3852                priority: None,
3853            })
3854            .collect();
3855
3856        client
3857            .write_property_multiple(addr, object_id, &writes)
3858            .await
3859            .unwrap();
3860
3861        let sent = state.sent.lock().await;
3862        assert!(sent.len() > 4);
3863        for (idx, (_, frame)) in sent.iter().take(4).enumerate() {
3864            let mut r = Reader::new(frame);
3865            let _npdu = Npdu::decode(&mut r).unwrap();
3866            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3867            assert!(hdr.segmented);
3868            assert_eq!(hdr.service_choice, SERVICE_WRITE_PROPERTY_MULTIPLE);
3869            assert_eq!(hdr.sequence_number, Some(idx as u8));
3870            assert_eq!(hdr.proposed_window_size, Some(4));
3871        }
3872    }
3873
3874    #[tokio::test]
3875    async fn write_property_multiple_adapts_window_to_peer_ack_window() {
3876        let (dl, state) = MockDataLink::new();
3877        let client = BacnetClient::with_datalink(dl)
3878            .with_response_timeout(Duration::from_secs(1))
3879            .with_segmented_request_window_size(4);
3880        let addr = DataLinkAddress::Ip(([192, 168, 1, 19], 47808).into());
3881        let object_id = ObjectId::new(ObjectType::AnalogOutput, 9);
3882
3883        {
3884            let mut recv = state.recv.lock().await;
3885            for seq in 0u8..=254 {
3886                let mut apdu = [0u8; 16];
3887                let mut w = Writer::new(&mut apdu);
3888                SegmentAck {
3889                    negative_ack: false,
3890                    sent_by_server: true,
3891                    invoke_id: 1,
3892                    sequence_number: seq,
3893                    actual_window_size: 2,
3894                }
3895                .encode(&mut w)
3896                .unwrap();
3897                recv.push_back((with_npdu(w.as_written()), addr));
3898            }
3899
3900            let mut apdu = [0u8; 16];
3901            let mut w = Writer::new(&mut apdu);
3902            SimpleAck {
3903                invoke_id: 1,
3904                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
3905            }
3906            .encode(&mut w)
3907            .unwrap();
3908            recv.push_back((with_npdu(w.as_written()), addr));
3909        }
3910
3911        let writes: Vec<PropertyWriteSpec> = (0..180)
3912            .map(|_| PropertyWriteSpec {
3913                property_id: PropertyId::Description,
3914                array_index: None,
3915                value: DataValue::CharacterString(
3916                    "rustbac segmented write test payload................................................................",
3917                ),
3918                priority: None,
3919            })
3920            .collect();
3921
3922        client
3923            .write_property_multiple(addr, object_id, &writes)
3924            .await
3925            .unwrap();
3926
3927        let sent = state.sent.lock().await;
3928        let mut saw_adapted_window = false;
3929        for (_, frame) in sent.iter() {
3930            let mut r = Reader::new(frame);
3931            let _npdu = Npdu::decode(&mut r).unwrap();
3932            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
3933            if hdr.sequence_number.unwrap_or(0) >= 4 && hdr.proposed_window_size == Some(2) {
3934                saw_adapted_window = true;
3935                break;
3936            }
3937        }
3938        assert!(saw_adapted_window);
3939    }
3940
3941    #[tokio::test]
3942    async fn write_property_multiple_retries_segment_batch_on_negative_ack() {
3943        let (dl, state) = MockDataLink::new();
3944        let client = BacnetClient::with_datalink(dl)
3945            .with_response_timeout(Duration::from_secs(1))
3946            .with_segmented_request_window_size(1)
3947            .with_segmented_request_retries(1);
3948        let addr = DataLinkAddress::Ip(([192, 168, 1, 15], 47808).into());
3949        let object_id = ObjectId::new(ObjectType::AnalogOutput, 7);
3950
3951        {
3952            let mut recv = state.recv.lock().await;
3953
3954            let mut nack_apdu = [0u8; 16];
3955            let mut nack_w = Writer::new(&mut nack_apdu);
3956            SegmentAck {
3957                negative_ack: true,
3958                sent_by_server: true,
3959                invoke_id: 1,
3960                sequence_number: 0,
3961                actual_window_size: 1,
3962            }
3963            .encode(&mut nack_w)
3964            .unwrap();
3965            recv.push_back((with_npdu(nack_w.as_written()), addr));
3966
3967            for seq in 0u8..=254 {
3968                let mut apdu = [0u8; 16];
3969                let mut w = Writer::new(&mut apdu);
3970                SegmentAck {
3971                    negative_ack: false,
3972                    sent_by_server: true,
3973                    invoke_id: 1,
3974                    sequence_number: seq,
3975                    actual_window_size: 1,
3976                }
3977                .encode(&mut w)
3978                .unwrap();
3979                recv.push_back((with_npdu(w.as_written()), addr));
3980            }
3981
3982            let mut apdu = [0u8; 16];
3983            let mut w = Writer::new(&mut apdu);
3984            SimpleAck {
3985                invoke_id: 1,
3986                service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
3987            }
3988            .encode(&mut w)
3989            .unwrap();
3990            recv.push_back((with_npdu(w.as_written()), addr));
3991        }
3992
3993        let writes: Vec<PropertyWriteSpec> = (0..180)
3994            .map(|_| PropertyWriteSpec {
3995                property_id: PropertyId::Description,
3996                array_index: None,
3997                value: DataValue::CharacterString(
3998                    "rustbac segmented write test payload................................................................",
3999                ),
4000                priority: None,
4001            })
4002            .collect();
4003
4004        client
4005            .write_property_multiple(addr, object_id, &writes)
4006            .await
4007            .unwrap();
4008
4009        let sent = state.sent.lock().await;
4010        let mut seq0_frames = 0usize;
4011        for (_, frame) in sent.iter() {
4012            let mut r = Reader::new(frame);
4013            let _npdu = Npdu::decode(&mut r).unwrap();
4014            let hdr = ConfirmedRequestHeader::decode(&mut r).unwrap();
4015            if hdr.sequence_number == Some(0) {
4016                seq0_frames += 1;
4017            }
4018        }
4019        assert!(seq0_frames >= 2);
4020    }
4021
4022    #[tokio::test]
4023    async fn read_property_ignores_invalid_frames_until_valid_response() {
4024        let (dl, state) = MockDataLink::new();
4025        let client = BacnetClient::with_datalink(dl).with_response_timeout(Duration::from_secs(1));
4026        let addr = DataLinkAddress::Ip(([192, 168, 1, 16], 47808).into());
4027        let state_for_task = state.clone();
4028
4029        tokio::spawn(async move {
4030            tokio::time::sleep(Duration::from_millis(20)).await;
4031            let mut apdu = [0u8; 128];
4032            let mut w = Writer::new(&mut apdu);
4033            ComplexAckHeader {
4034                segmented: false,
4035                more_follows: false,
4036                invoke_id: 1,
4037                sequence_number: None,
4038                proposed_window_size: None,
4039                service_choice: SERVICE_READ_PROPERTY,
4040            }
4041            .encode(&mut w)
4042            .unwrap();
4043            encode_ctx_object_id(&mut w, 0, ObjectId::new(ObjectType::Device, 1).raw()).unwrap();
4044            encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
4045            Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
4046            encode_app_real(&mut w, 77.0).unwrap();
4047            Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
4048            state_for_task
4049                .recv
4050                .lock()
4051                .await
4052                .push_back((with_npdu(w.as_written()), addr));
4053        });
4054
4055        let value = client
4056            .read_property(
4057                addr,
4058                ObjectId::new(ObjectType::Device, 1),
4059                PropertyId::PresentValue,
4060            )
4061            .await
4062            .unwrap();
4063        assert!(matches!(
4064            value,
4065            ClientDataValue::Real(v) if (v - 77.0).abs() < f32::EPSILON
4066        ));
4067    }
4068
4069    #[tokio::test]
4070    async fn read_property_maps_reject() {
4071        let (dl, state) = MockDataLink::new();
4072        let client = BacnetClient::with_datalink(dl);
4073        let addr = DataLinkAddress::Ip(([192, 168, 1, 7], 47808).into());
4074
4075        let mut apdu = [0u8; 8];
4076        let mut w = Writer::new(&mut apdu);
4077        w.write_u8((ApduType::Reject as u8) << 4).unwrap();
4078        w.write_u8(1).unwrap(); // invoke id
4079        w.write_u8(2).unwrap(); // reason
4080        state
4081            .recv
4082            .lock()
4083            .await
4084            .push_back((with_npdu(w.as_written()), addr));
4085
4086        let err = client
4087            .read_property(
4088                addr,
4089                ObjectId::new(ObjectType::Device, 1),
4090                PropertyId::ObjectName,
4091            )
4092            .await
4093            .unwrap_err();
4094        assert!(matches!(
4095            err,
4096            crate::ClientError::RemoteReject { reason: 2 }
4097        ));
4098    }
4099
4100    #[tokio::test]
4101    async fn read_property_maps_remote_error_details() {
4102        let (dl, state) = MockDataLink::new();
4103        let client = BacnetClient::with_datalink(dl);
4104        let addr = DataLinkAddress::Ip(([192, 168, 1, 17], 47808).into());
4105
4106        let mut apdu = [0u8; 16];
4107        let mut w = Writer::new(&mut apdu);
4108        w.write_u8((ApduType::Error as u8) << 4).unwrap();
4109        w.write_u8(1).unwrap(); // invoke id
4110        w.write_u8(rustbac_core::services::read_property::SERVICE_READ_PROPERTY)
4111            .unwrap();
4112        Tag::Context { tag_num: 0, len: 1 }.encode(&mut w).unwrap();
4113        w.write_u8(2).unwrap(); // property class
4114        Tag::Context { tag_num: 1, len: 1 }.encode(&mut w).unwrap();
4115        w.write_u8(32).unwrap(); // unknownProperty
4116
4117        state
4118            .recv
4119            .lock()
4120            .await
4121            .push_back((with_npdu(w.as_written()), addr));
4122
4123        let err = client
4124            .read_property(
4125                addr,
4126                ObjectId::new(ObjectType::Device, 1),
4127                PropertyId::ObjectName,
4128            )
4129            .await
4130            .unwrap_err();
4131        assert!(matches!(
4132            err,
4133            crate::ClientError::RemoteServiceError {
4134                service_choice: rustbac_core::services::read_property::SERVICE_READ_PROPERTY,
4135                error_class_raw: Some(2),
4136                error_code_raw: Some(32),
4137                error_class: Some(rustbac_core::types::ErrorClass::Property),
4138                error_code: Some(rustbac_core::types::ErrorCode::UnknownProperty),
4139            }
4140        ));
4141    }
4142
4143    #[tokio::test]
4144    async fn write_property_maps_abort() {
4145        let (dl, state) = MockDataLink::new();
4146        let client = BacnetClient::with_datalink(dl);
4147        let addr = DataLinkAddress::Ip(([192, 168, 1, 8], 47808).into());
4148
4149        let mut apdu = [0u8; 8];
4150        let mut w = Writer::new(&mut apdu);
4151        w.write_u8(((ApduType::Abort as u8) << 4) | 0x01).unwrap(); // server abort
4152        w.write_u8(1).unwrap(); // invoke id
4153        w.write_u8(9).unwrap(); // reason
4154        state
4155            .recv
4156            .lock()
4157            .await
4158            .push_back((with_npdu(w.as_written()), addr));
4159
4160        let req = rustbac_core::services::write_property::WritePropertyRequest {
4161            object_id: ObjectId::new(ObjectType::AnalogOutput, 1),
4162            property_id: PropertyId::PresentValue,
4163            value: DataValue::Real(10.0),
4164            priority: Some(8),
4165            ..Default::default()
4166        };
4167        let err = client.write_property(addr, req).await.unwrap_err();
4168        assert!(matches!(
4169            err,
4170            crate::ClientError::RemoteAbort {
4171                reason: 9,
4172                server: true
4173            }
4174        ));
4175    }
4176
4177    #[tokio::test]
4178    async fn read_property_multiple_returns_owned_string() {
4179        let (dl, state) = MockDataLink::new();
4180        let client = BacnetClient::with_datalink(dl);
4181        let addr = DataLinkAddress::Ip(([192, 168, 1, 9], 47808).into());
4182        let object_id = ObjectId::new(ObjectType::Device, 1);
4183
4184        let mut apdu_buf = [0u8; 256];
4185        let mut w = Writer::new(&mut apdu_buf);
4186        ComplexAckHeader {
4187            segmented: false,
4188            more_follows: false,
4189            invoke_id: 1,
4190            sequence_number: None,
4191            proposed_window_size: None,
4192            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
4193        }
4194        .encode(&mut w)
4195        .unwrap();
4196        encode_ctx_unsigned(&mut w, 0, object_id.raw()).unwrap();
4197        rustbac_core::encoding::tag::Tag::Opening { tag_num: 1 }
4198            .encode(&mut w)
4199            .unwrap();
4200        encode_ctx_unsigned(&mut w, 2, PropertyId::ObjectName.to_u32()).unwrap();
4201        rustbac_core::encoding::tag::Tag::Opening { tag_num: 4 }
4202            .encode(&mut w)
4203            .unwrap();
4204        rustbac_core::services::value_codec::encode_application_data_value(
4205            &mut w,
4206            &DataValue::CharacterString("AHU-1"),
4207        )
4208        .unwrap();
4209        rustbac_core::encoding::tag::Tag::Closing { tag_num: 4 }
4210            .encode(&mut w)
4211            .unwrap();
4212        rustbac_core::encoding::tag::Tag::Closing { tag_num: 1 }
4213            .encode(&mut w)
4214            .unwrap();
4215
4216        state
4217            .recv
4218            .lock()
4219            .await
4220            .push_back((with_npdu(w.as_written()), addr));
4221
4222        let values = client
4223            .read_property_multiple(addr, object_id, &[PropertyId::ObjectName])
4224            .await
4225            .unwrap();
4226        assert_eq!(values.len(), 1);
4227        assert_eq!(values[0].0, PropertyId::ObjectName);
4228        assert!(matches!(
4229            &values[0].1,
4230            ClientDataValue::CharacterString(s) if s == "AHU-1"
4231        ));
4232    }
4233
4234    #[tokio::test]
4235    async fn new_sc_rejects_invalid_endpoint() {
4236        let err = BacnetClient::new_sc("not a url").await.unwrap_err();
4237        assert!(matches!(err, crate::ClientError::DataLink(_)));
4238    }
4239}