Skip to main content

bacnet_client/
client.rs

1//! BACnetClient: high-level and low-level request APIs.
2//!
3//! The client owns a NetworkLayer, spawns an APDU dispatch task, and provides
4//! methods for sending confirmed and unconfirmed BACnet requests.
5
6use std::collections::HashMap;
7use std::net::{Ipv4Addr, Ipv6Addr};
8use std::sync::Arc;
9use std::time::Instant;
10
11use bytes::{Bytes, BytesMut};
12use tokio::sync::{broadcast, mpsc, Mutex};
13use tokio::task::JoinHandle;
14use tokio::time::{timeout, Duration};
15use tracing::{debug, warn};
16
17use bacnet_encoding::apdu::{
18    self, encode_apdu, Apdu, ConfirmedRequest as ConfirmedRequestPdu, SegmentAck as SegmentAckPdu,
19    SimpleAck,
20};
21use bacnet_network::layer::NetworkLayer;
22use bacnet_services::cov::COVNotificationRequest;
23use bacnet_transport::bip::BipTransport;
24use bacnet_transport::bip6::Bip6Transport;
25use bacnet_transport::port::TransportPort;
26use bacnet_types::enums::{ConfirmedServiceChoice, NetworkPriority, UnconfirmedServiceChoice};
27use bacnet_types::error::Error;
28use bacnet_types::MacAddr;
29
30use crate::discovery::{DeviceTable, DiscoveredDevice};
31use crate::segmentation::{max_segment_payload, split_payload, SegmentReceiver, SegmentedPduType};
32use crate::tsm::{Tsm, TsmConfig, TsmResponse};
33
34/// Client configuration.
35#[derive(Debug, Clone)]
36pub struct ClientConfig {
37    /// Local interface to bind.
38    pub interface: Ipv4Addr,
39    /// UDP port (0 for ephemeral).
40    pub port: u16,
41    /// Directed broadcast address.
42    pub broadcast_address: Ipv4Addr,
43    /// APDU timeout in milliseconds.
44    pub apdu_timeout_ms: u64,
45    /// Number of APDU retries.
46    pub apdu_retries: u8,
47    /// Maximum APDU length this client accepts.
48    pub max_apdu_length: u16,
49    /// Maximum segments this client accepts (None = unspecified).
50    pub max_segments: Option<u8>,
51    /// Whether this client accepts segmented responses.
52    pub segmented_response_accepted: bool,
53    /// Proposed window size for segmented transfers (1-127, default 1).
54    pub proposed_window_size: u8,
55}
56
57impl Default for ClientConfig {
58    fn default() -> Self {
59        Self {
60            interface: Ipv4Addr::UNSPECIFIED,
61            port: 0xBAC0,
62            broadcast_address: Ipv4Addr::BROADCAST,
63            apdu_timeout_ms: 6000,
64            apdu_retries: 3,
65            max_apdu_length: 1476,
66            max_segments: None,
67            segmented_response_accepted: true,
68            proposed_window_size: 1,
69        }
70    }
71}
72
73/// Generic builder for BACnetClient with a pre-built transport.
74pub struct ClientBuilder<T: TransportPort> {
75    config: ClientConfig,
76    transport: Option<T>,
77}
78
79impl<T: TransportPort + 'static> ClientBuilder<T> {
80    /// Set the pre-built transport.
81    pub fn transport(mut self, transport: T) -> Self {
82        self.transport = Some(transport);
83        self
84    }
85
86    /// Set APDU timeout in milliseconds.
87    pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
88        self.config.apdu_timeout_ms = ms;
89        self
90    }
91
92    /// Set the maximum APDU length this client accepts.
93    pub fn max_apdu_length(mut self, len: u16) -> Self {
94        self.config.max_apdu_length = len;
95        self
96    }
97
98    /// Build and start the client.
99    pub async fn build(self) -> Result<BACnetClient<T>, Error> {
100        let transport = self
101            .transport
102            .ok_or_else(|| Error::Encoding("transport not set on ClientBuilder".into()))?;
103        BACnetClient::start(self.config, transport).await
104    }
105}
106
107/// BIP-specific builder that constructs `BipTransport` from interface/port/broadcast fields.
108pub struct BipClientBuilder {
109    config: ClientConfig,
110}
111
112impl BipClientBuilder {
113    /// Set the local interface IP.
114    pub fn interface(mut self, ip: Ipv4Addr) -> Self {
115        self.config.interface = ip;
116        self
117    }
118
119    /// Set the UDP port (0 for ephemeral).
120    pub fn port(mut self, port: u16) -> Self {
121        self.config.port = port;
122        self
123    }
124
125    /// Set the directed broadcast address.
126    pub fn broadcast_address(mut self, addr: Ipv4Addr) -> Self {
127        self.config.broadcast_address = addr;
128        self
129    }
130
131    /// Set APDU timeout in milliseconds.
132    pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
133        self.config.apdu_timeout_ms = ms;
134        self
135    }
136
137    /// Set the maximum APDU length this client accepts.
138    pub fn max_apdu_length(mut self, len: u16) -> Self {
139        self.config.max_apdu_length = len;
140        self
141    }
142
143    /// Build and start the client, constructing a BipTransport from the config.
144    pub async fn build(self) -> Result<BACnetClient<BipTransport>, Error> {
145        let transport = BipTransport::new(
146            self.config.interface,
147            self.config.port,
148            self.config.broadcast_address,
149        );
150        BACnetClient::start(self.config, transport).await
151    }
152}
153
154/// In-progress segmented receive state.
155struct SegmentedReceiveState {
156    receiver: SegmentReceiver,
157    /// Next expected sequence number (for gap detection).
158    expected_next_seq: u8,
159    /// Timestamp of last received segment (for reaping stale sessions).
160    last_activity: Instant,
161}
162
163/// Timeout for idle segmented reassembly sessions (Clause 9.1.6).
164const SEG_RECEIVER_TIMEOUT: Duration = Duration::from_secs(4);
165
166/// Key for tracking in-progress segmented receives: (source_mac, invoke_id).
167type SegKey = (MacAddr, u8);
168
169/// BACnet client with low-level and high-level request APIs.
170pub struct BACnetClient<T: TransportPort> {
171    config: ClientConfig,
172    network: Arc<NetworkLayer<T>>,
173    tsm: Arc<Mutex<Tsm>>,
174    device_table: Arc<Mutex<DeviceTable>>,
175    cov_tx: broadcast::Sender<COVNotificationRequest>,
176    dispatch_task: Option<JoinHandle<()>>,
177    seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
178    local_mac: MacAddr,
179}
180
181impl BACnetClient<BipTransport> {
182    /// Create a BIP-specific builder with interface/port/broadcast fields.
183    pub fn bip_builder() -> BipClientBuilder {
184        BipClientBuilder {
185            config: ClientConfig::default(),
186        }
187    }
188
189    /// Create a BIP-specific builder (alias for backward compatibility).
190    pub fn builder() -> BipClientBuilder {
191        Self::bip_builder()
192    }
193
194    /// Read the Broadcast Distribution Table from a BBMD.
195    pub async fn read_bdt(
196        &self,
197        target: &[u8],
198    ) -> Result<Vec<bacnet_transport::bbmd::BdtEntry>, Error> {
199        self.network.transport().read_bdt(target).await
200    }
201
202    /// Write the Broadcast Distribution Table to a BBMD.
203    pub async fn write_bdt(
204        &self,
205        target: &[u8],
206        entries: &[bacnet_transport::bbmd::BdtEntry],
207    ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
208        self.network.transport().write_bdt(target, entries).await
209    }
210
211    /// Read the Foreign Device Table from a BBMD.
212    pub async fn read_fdt(
213        &self,
214        target: &[u8],
215    ) -> Result<Vec<bacnet_transport::bbmd::FdtEntryWire>, Error> {
216        self.network.transport().read_fdt(target).await
217    }
218
219    /// Delete a Foreign Device Table entry on a BBMD.
220    pub async fn delete_fdt_entry(
221        &self,
222        target: &[u8],
223        ip: [u8; 4],
224        port: u16,
225    ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
226        self.network
227            .transport()
228            .delete_fdt_entry(target, ip, port)
229            .await
230    }
231
232    /// Register as a foreign device with a BBMD and return the result code.
233    pub async fn register_foreign_device_bvlc(
234        &self,
235        target: &[u8],
236        ttl: u16,
237    ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
238        self.network
239            .transport()
240            .register_foreign_device(target, ttl)
241            .await
242    }
243}
244
245impl BACnetClient<Bip6Transport> {
246    /// Create a BIP6-specific builder for BACnet/IPv6 transport.
247    pub fn bip6_builder() -> Bip6ClientBuilder {
248        Bip6ClientBuilder {
249            config: ClientConfig::default(),
250            interface: Ipv6Addr::UNSPECIFIED,
251            device_instance: None,
252        }
253    }
254}
255
256/// BIP6-specific builder that constructs `Bip6Transport` from IPv6 interface/port/device-instance.
257pub struct Bip6ClientBuilder {
258    config: ClientConfig,
259    interface: Ipv6Addr,
260    device_instance: Option<u32>,
261}
262
263impl Bip6ClientBuilder {
264    /// Set the local IPv6 interface address.
265    pub fn interface(mut self, ip: Ipv6Addr) -> Self {
266        self.interface = ip;
267        self
268    }
269
270    /// Set the UDP port (0 for ephemeral).
271    pub fn port(mut self, port: u16) -> Self {
272        self.config.port = port;
273        self
274    }
275
276    /// Set the device instance for VMAC derivation (Annex U.5).
277    pub fn device_instance(mut self, instance: u32) -> Self {
278        self.device_instance = Some(instance);
279        self
280    }
281
282    /// Set APDU timeout in milliseconds.
283    pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
284        self.config.apdu_timeout_ms = ms;
285        self
286    }
287
288    /// Set the maximum APDU length this client accepts.
289    pub fn max_apdu_length(mut self, len: u16) -> Self {
290        self.config.max_apdu_length = len;
291        self
292    }
293
294    /// Build and start the client, constructing a Bip6Transport from the config.
295    pub async fn build(self) -> Result<BACnetClient<Bip6Transport>, Error> {
296        let transport = Bip6Transport::new(self.interface, self.config.port, self.device_instance);
297        BACnetClient::start(self.config, transport).await
298    }
299}
300
301#[cfg(feature = "sc-tls")]
302impl BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>> {
303    /// Create an SC-specific builder that connects to a BACnet/SC hub.
304    pub fn sc_builder() -> ScClientBuilder {
305        ScClientBuilder {
306            config: ClientConfig::default(),
307            hub_url: String::new(),
308            tls_config: None,
309            vmac: [0; 6],
310            heartbeat_interval_ms: 30_000,
311            heartbeat_timeout_ms: 60_000,
312            reconnect: None,
313        }
314    }
315}
316
317/// SC-specific client builder.
318///
319/// Created by [`BACnetClient::sc_builder()`].  Requires the `sc-tls` feature.
320#[cfg(feature = "sc-tls")]
321pub struct ScClientBuilder {
322    config: ClientConfig,
323    hub_url: String,
324    tls_config: Option<std::sync::Arc<tokio_rustls::rustls::ClientConfig>>,
325    vmac: bacnet_transport::sc_frame::Vmac,
326    heartbeat_interval_ms: u64,
327    heartbeat_timeout_ms: u64,
328    reconnect: Option<bacnet_transport::sc::ScReconnectConfig>,
329}
330
331#[cfg(feature = "sc-tls")]
332impl ScClientBuilder {
333    /// Set the hub WebSocket URL (e.g. `wss://hub.example.com/bacnet`).
334    pub fn hub_url(mut self, url: &str) -> Self {
335        self.hub_url = url.to_string();
336        self
337    }
338
339    /// Set the TLS client configuration.
340    pub fn tls_config(
341        mut self,
342        config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
343    ) -> Self {
344        self.tls_config = Some(config);
345        self
346    }
347
348    /// Set the local VMAC address.
349    pub fn vmac(mut self, vmac: [u8; 6]) -> Self {
350        self.vmac = vmac;
351        self
352    }
353
354    /// Set the APDU timeout in milliseconds.
355    pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
356        self.config.apdu_timeout_ms = ms;
357        self
358    }
359
360    /// Set the heartbeat interval in milliseconds (default 30 000).
361    pub fn heartbeat_interval_ms(mut self, ms: u64) -> Self {
362        self.heartbeat_interval_ms = ms;
363        self
364    }
365
366    /// Set the heartbeat timeout in milliseconds (default 60 000).
367    pub fn heartbeat_timeout_ms(mut self, ms: u64) -> Self {
368        self.heartbeat_timeout_ms = ms;
369        self
370    }
371
372    /// Enable automatic reconnection with the given configuration.
373    pub fn reconnect(mut self, config: bacnet_transport::sc::ScReconnectConfig) -> Self {
374        self.reconnect = Some(config);
375        self
376    }
377
378    /// Connect to the hub and start the client.
379    pub async fn build(
380        self,
381    ) -> Result<
382        BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>>,
383        Error,
384    > {
385        let tls_config = self
386            .tls_config
387            .ok_or_else(|| Error::Encoding("SC client builder: tls_config is required".into()))?;
388
389        let ws = bacnet_transport::sc_tls::TlsWebSocket::connect(&self.hub_url, tls_config).await?;
390
391        let mut transport = bacnet_transport::sc::ScTransport::new(ws, self.vmac)
392            .with_heartbeat_interval_ms(self.heartbeat_interval_ms)
393            .with_heartbeat_timeout_ms(self.heartbeat_timeout_ms);
394        if let Some(rc) = self.reconnect {
395            #[allow(deprecated)]
396            {
397                transport = transport.with_reconnect(rc);
398            }
399        }
400
401        BACnetClient::start(self.config, transport).await
402    }
403}
404
405impl<T: TransportPort + 'static> BACnetClient<T> {
406    /// Create a generic builder that accepts a pre-built transport.
407    pub fn generic_builder() -> ClientBuilder<T> {
408        ClientBuilder {
409            config: ClientConfig::default(),
410            transport: None,
411        }
412    }
413
414    /// Start the client: bind transport, start network layer, spawn dispatch.
415    pub async fn start(mut config: ClientConfig, transport: T) -> Result<Self, Error> {
416        // Clamp max_apdu_length to the transport's physical limit.
417        let transport_max = transport.max_apdu_length();
418        config.max_apdu_length = config.max_apdu_length.min(transport_max);
419
420        let mut network = NetworkLayer::new(transport);
421        let mut apdu_rx = network.start().await?;
422        let local_mac = MacAddr::from_slice(network.local_mac());
423
424        let network = Arc::new(network);
425
426        let tsm_config = TsmConfig {
427            apdu_timeout_ms: config.apdu_timeout_ms,
428            apdu_retries: config.apdu_retries,
429        };
430        let tsm = Arc::new(Mutex::new(Tsm::new(tsm_config)));
431        let tsm_dispatch = Arc::clone(&tsm);
432        let device_table = Arc::new(Mutex::new(DeviceTable::new()));
433        let device_table_dispatch = Arc::clone(&device_table);
434        let network_dispatch = Arc::clone(&network);
435        let (cov_tx, _) = broadcast::channel::<COVNotificationRequest>(64);
436        let cov_tx_dispatch = cov_tx.clone();
437        let seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>> =
438            Arc::new(Mutex::new(HashMap::new()));
439        let seg_ack_senders_dispatch = Arc::clone(&seg_ack_senders);
440
441        // Spawn APDU dispatch task
442        let dispatch_task = tokio::spawn(async move {
443            // Segmented receive state is task-local — no external sharing needed.
444            let mut seg_state: HashMap<SegKey, SegmentedReceiveState> = HashMap::new();
445
446            while let Some(received) = apdu_rx.recv().await {
447                // Reap timed-out segmented reassembly sessions (Clause 9.1.6).
448                let now = Instant::now();
449                seg_state.retain(|_key, state| {
450                    now.duration_since(state.last_activity) < SEG_RECEIVER_TIMEOUT
451                });
452
453                match apdu::decode_apdu(received.apdu.clone()) {
454                    Ok(decoded) => {
455                        Self::dispatch_apdu(
456                            &tsm_dispatch,
457                            &device_table_dispatch,
458                            &network_dispatch,
459                            &cov_tx_dispatch,
460                            &mut seg_state,
461                            &seg_ack_senders_dispatch,
462                            &received.source_mac,
463                            decoded,
464                        )
465                        .await;
466                    }
467                    Err(e) => {
468                        warn!(error = %e, "Failed to decode received APDU");
469                    }
470                }
471            }
472        });
473
474        Ok(Self {
475            config,
476            network,
477            tsm,
478            device_table,
479            cov_tx,
480            dispatch_task: Some(dispatch_task),
481            seg_ack_senders,
482            local_mac,
483        })
484    }
485
486    /// Dispatch a received APDU to the appropriate TSM handler.
487    #[allow(clippy::too_many_arguments)]
488    async fn dispatch_apdu(
489        tsm: &Arc<Mutex<Tsm>>,
490        device_table: &Arc<Mutex<DeviceTable>>,
491        network: &Arc<NetworkLayer<T>>,
492        cov_tx: &broadcast::Sender<COVNotificationRequest>,
493        seg_state: &mut HashMap<SegKey, SegmentedReceiveState>,
494        seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
495        source_mac: &[u8],
496        apdu: Apdu,
497    ) {
498        match apdu {
499            Apdu::SimpleAck(ack) => {
500                debug!(invoke_id = ack.invoke_id, "Received SimpleAck");
501                let mut tsm = tsm.lock().await;
502                tsm.complete_transaction(source_mac, ack.invoke_id, TsmResponse::SimpleAck);
503            }
504            Apdu::ComplexAck(ack) => {
505                if ack.segmented {
506                    Self::handle_segmented_complex_ack(tsm, network, seg_state, source_mac, ack)
507                        .await;
508                } else {
509                    debug!(invoke_id = ack.invoke_id, "Received ComplexAck");
510                    let mut tsm = tsm.lock().await;
511                    tsm.complete_transaction(
512                        source_mac,
513                        ack.invoke_id,
514                        TsmResponse::ComplexAck {
515                            service_data: ack.service_ack,
516                        },
517                    );
518                }
519            }
520            Apdu::Error(err) => {
521                debug!(invoke_id = err.invoke_id, "Received Error PDU");
522                let mut tsm = tsm.lock().await;
523                tsm.complete_transaction(
524                    source_mac,
525                    err.invoke_id,
526                    TsmResponse::Error {
527                        class: err.error_class.to_raw() as u32,
528                        code: err.error_code.to_raw() as u32,
529                    },
530                );
531            }
532            Apdu::Reject(rej) => {
533                debug!(invoke_id = rej.invoke_id, "Received Reject PDU");
534                let mut tsm = tsm.lock().await;
535                tsm.complete_transaction(
536                    source_mac,
537                    rej.invoke_id,
538                    TsmResponse::Reject {
539                        reason: rej.reject_reason.to_raw(),
540                    },
541                );
542            }
543            Apdu::Abort(abt) => {
544                debug!(invoke_id = abt.invoke_id, "Received Abort PDU");
545                let mut tsm = tsm.lock().await;
546                tsm.complete_transaction(
547                    source_mac,
548                    abt.invoke_id,
549                    TsmResponse::Abort {
550                        reason: abt.abort_reason.to_raw(),
551                    },
552                );
553            }
554            Apdu::ConfirmedRequest(req) => {
555                // Handle ConfirmedCOVNotification from a server
556                if req.service_choice == ConfirmedServiceChoice::CONFIRMED_COV_NOTIFICATION {
557                    match COVNotificationRequest::decode(&req.service_request) {
558                        Ok(notification) => {
559                            debug!(
560                                object = ?notification.monitored_object_identifier,
561                                "Received ConfirmedCOVNotification"
562                            );
563                            let _ = cov_tx.send(notification);
564
565                            // Send SimpleAck back
566                            let ack = Apdu::SimpleAck(SimpleAck {
567                                invoke_id: req.invoke_id,
568                                service_choice: req.service_choice,
569                            });
570                            let mut buf = BytesMut::with_capacity(4);
571                            encode_apdu(&mut buf, &ack);
572                            if let Err(e) = network
573                                .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
574                                .await
575                            {
576                                warn!(error = %e, "Failed to send SimpleAck for COV notification");
577                            }
578                        }
579                        Err(e) => {
580                            warn!(error = %e, "Failed to decode ConfirmedCOVNotification");
581                        }
582                    }
583                } else {
584                    debug!(
585                        service = req.service_choice.to_raw(),
586                        "Ignoring ConfirmedRequest (client mode)"
587                    );
588                }
589            }
590            Apdu::UnconfirmedRequest(req) => {
591                if req.service_choice == UnconfirmedServiceChoice::I_AM {
592                    match bacnet_services::who_is::IAmRequest::decode(&req.service_request) {
593                        Ok(i_am) => {
594                            debug!(
595                                device = i_am.object_identifier.instance_number(),
596                                vendor = i_am.vendor_id,
597                                "Received IAm"
598                            );
599                            let device = DiscoveredDevice {
600                                object_identifier: i_am.object_identifier,
601                                mac_address: MacAddr::from_slice(source_mac),
602                                max_apdu_length: i_am.max_apdu_length,
603                                segmentation_supported: i_am.segmentation_supported,
604                                max_segments_accepted: None,
605                                vendor_id: i_am.vendor_id,
606                                last_seen: std::time::Instant::now(),
607                            };
608                            device_table.lock().await.upsert(device);
609                        }
610                        Err(e) => {
611                            warn!(error = %e, "Failed to decode IAm");
612                        }
613                    }
614                } else if req.service_choice
615                    == UnconfirmedServiceChoice::UNCONFIRMED_COV_NOTIFICATION
616                {
617                    match COVNotificationRequest::decode(&req.service_request) {
618                        Ok(notification) => {
619                            debug!(
620                                object = ?notification.monitored_object_identifier,
621                                "Received UnconfirmedCOVNotification"
622                            );
623                            let _ = cov_tx.send(notification);
624                        }
625                        Err(e) => {
626                            warn!(error = %e, "Failed to decode UnconfirmedCOVNotification");
627                        }
628                    }
629                } else {
630                    debug!(
631                        service = req.service_choice.to_raw(),
632                        "Ignoring unconfirmed service in client dispatch"
633                    );
634                }
635            }
636            Apdu::SegmentAck(sa) => {
637                // Forward to the segmented send in progress for this transaction.
638                let key = (MacAddr::from_slice(source_mac), sa.invoke_id);
639                let senders = seg_ack_senders.lock().await;
640                if let Some(tx) = senders.get(&key) {
641                    let _ = tx.try_send(sa);
642                } else {
643                    debug!(
644                        invoke_id = sa.invoke_id,
645                        "Ignoring SegmentAck for unknown transaction"
646                    );
647                }
648            }
649        }
650    }
651
652    /// Handle a segmented ComplexAck: accumulate segments, send SegmentAck,
653    /// reassemble and complete TSM transaction when all segments received.
654    async fn handle_segmented_complex_ack(
655        tsm: &Arc<Mutex<Tsm>>,
656        network: &Arc<NetworkLayer<T>>,
657        seg_state: &mut HashMap<SegKey, SegmentedReceiveState>,
658        source_mac: &[u8],
659        ack: bacnet_encoding::apdu::ComplexAck,
660    ) {
661        let seq = ack.sequence_number.unwrap_or(0);
662        let key = (MacAddr::from_slice(source_mac), ack.invoke_id);
663
664        debug!(
665            invoke_id = ack.invoke_id,
666            seq = seq,
667            more = ack.more_follows,
668            "Received segmented ComplexAck"
669        );
670
671        // Cap concurrent segmented sessions to prevent resource exhaustion
672        const MAX_CONCURRENT_SEG_SESSIONS: usize = 64;
673        if !seg_state.contains_key(&key) && seg_state.len() >= MAX_CONCURRENT_SEG_SESSIONS {
674            warn!(
675                invoke_id = ack.invoke_id,
676                sessions = seg_state.len(),
677                "Max concurrent segmented sessions reached, dropping segment"
678            );
679            return;
680        }
681
682        // Get or create the receive state for this transaction
683        let state = seg_state
684            .entry(key.clone())
685            .or_insert_with(|| SegmentedReceiveState {
686                receiver: SegmentReceiver::new(),
687                expected_next_seq: 0,
688                last_activity: Instant::now(),
689            });
690
691        // Update activity timestamp for this session.
692        state.last_activity = Instant::now();
693
694        // Gap detection: if the sequence number doesn't match expected, send
695        // a negative SegmentAck requesting retransmission from the last
696        // contiguous sequence number.
697        if seq != state.expected_next_seq {
698            warn!(
699                invoke_id = ack.invoke_id,
700                expected = state.expected_next_seq,
701                received = seq,
702                "Segment gap detected, sending negative SegmentAck"
703            );
704            let neg_ack = Apdu::SegmentAck(SegmentAckPdu {
705                negative_ack: true,
706                sent_by_server: false,
707                invoke_id: ack.invoke_id,
708                sequence_number: state.expected_next_seq,
709                actual_window_size: ack.proposed_window_size.unwrap_or(1),
710            });
711            let mut buf = BytesMut::with_capacity(4);
712            encode_apdu(&mut buf, &neg_ack);
713            if let Err(e) = network
714                .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
715                .await
716            {
717                warn!(error = %e, "Failed to send negative SegmentAck");
718            }
719            return;
720        }
721
722        // Store this segment and advance expected sequence
723        if let Err(e) = state.receiver.receive(seq, ack.service_ack) {
724            warn!(error = %e, "Rejecting oversized segment");
725            return;
726        }
727        state.expected_next_seq = seq.wrapping_add(1);
728
729        // Send SegmentAck to acknowledge receipt
730        let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
731            negative_ack: false,
732            sent_by_server: false,
733            invoke_id: ack.invoke_id,
734            sequence_number: seq,
735            actual_window_size: ack.proposed_window_size.unwrap_or(1),
736        });
737        let mut buf = BytesMut::with_capacity(4);
738        encode_apdu(&mut buf, &seg_ack);
739        if let Err(e) = network
740            .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
741            .await
742        {
743            warn!(error = %e, "Failed to send SegmentAck");
744        }
745
746        // If this is the last segment, reassemble and complete the transaction.
747        // Gap detection: reassemble() validates that segments 0..total are all
748        // present; any missing sequence number produces an Err.
749        if !ack.more_follows {
750            let state = seg_state.remove(&key).unwrap();
751            let total = state.receiver.received_count();
752            match state.receiver.reassemble(total) {
753                Ok(service_data) => {
754                    debug!(
755                        invoke_id = ack.invoke_id,
756                        segments = total,
757                        bytes = service_data.len(),
758                        "Reassembled segmented ComplexAck"
759                    );
760                    let mut tsm = tsm.lock().await;
761                    tsm.complete_transaction(
762                        source_mac,
763                        ack.invoke_id,
764                        TsmResponse::ComplexAck {
765                            service_data: Bytes::from(service_data),
766                        },
767                    );
768                }
769                Err(e) => {
770                    warn!(error = %e, "Failed to reassemble segmented ComplexAck");
771                }
772            }
773        }
774    }
775
776    /// Get the client's local MAC address.
777    pub fn local_mac(&self) -> &[u8] {
778        &self.local_mac
779    }
780
781    // -----------------------------------------------------------------------
782    // Low-level API
783    // -----------------------------------------------------------------------
784
785    /// Send a confirmed request and wait for the response.
786    ///
787    /// Returns the service response data (empty `Vec` for SimpleAck).
788    /// Returns an error on timeout, protocol error, reject, or abort.
789    ///
790    /// Automatically uses segmented transfer when the payload exceeds the
791    /// remote device's max APDU length.
792    pub async fn confirmed_request(
793        &self,
794        destination_mac: &[u8],
795        service_choice: ConfirmedServiceChoice,
796        service_data: &[u8],
797    ) -> Result<Bytes, Error> {
798        // Check if segmentation is needed.
799        // Non-segmented ConfirmedRequest overhead: 4 bytes (type+flags, max-seg/apdu, invoke, service).
800        let unsegmented_apdu_size = 4 + service_data.len();
801        let (remote_max_apdu, remote_max_segments) = {
802            let dt = self.device_table.lock().await;
803            let device = dt.get_by_mac(destination_mac);
804            let max_apdu = device
805                .map(|d| d.max_apdu_length as u16)
806                .unwrap_or(self.config.max_apdu_length);
807            let max_seg = device.and_then(|d| d.max_segments_accepted);
808            (max_apdu, max_seg)
809        };
810        if unsegmented_apdu_size > remote_max_apdu as usize {
811            return self
812                .segmented_confirmed_request(
813                    destination_mac,
814                    service_choice,
815                    service_data,
816                    remote_max_apdu,
817                    remote_max_segments,
818                )
819                .await;
820        }
821
822        // Allocate invoke ID and register transaction
823        let (invoke_id, rx) = {
824            let mut tsm = self.tsm.lock().await;
825            let invoke_id = tsm.allocate_invoke_id(destination_mac).ok_or_else(|| {
826                Error::Encoding("all invoke IDs exhausted for destination".into())
827            })?;
828            let rx = tsm.register_transaction(MacAddr::from_slice(destination_mac), invoke_id);
829            (invoke_id, rx)
830        };
831
832        // Build ConfirmedRequest APDU
833        let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
834            segmented: false,
835            more_follows: false,
836            segmented_response_accepted: self.config.segmented_response_accepted,
837            max_segments: self.config.max_segments,
838            max_apdu_length: self.config.max_apdu_length,
839            invoke_id,
840            sequence_number: None,
841            proposed_window_size: None,
842            service_choice,
843            service_request: Bytes::copy_from_slice(service_data),
844        });
845
846        let mut buf = BytesMut::with_capacity(6 + service_data.len());
847        encode_apdu(&mut buf, &pdu);
848
849        // Retry loop per Clause 5.4.2: retransmit on timeout up to apdu_retries times.
850        // The invoke_id stays the same across retries. The TSM transaction is NOT
851        // cancelled between retries — only on final timeout or non-timeout error.
852        let timeout_duration = Duration::from_millis(self.config.apdu_timeout_ms);
853        let max_retries = self.config.apdu_retries;
854        let mut attempts: u8 = 0;
855        let mut rx = rx;
856
857        loop {
858            if let Err(e) = self
859                .network
860                .send_apdu(&buf, destination_mac, true, NetworkPriority::NORMAL)
861                .await
862            {
863                // Clean up the invoke ID on send failure to prevent pool exhaustion
864                let mut tsm = self.tsm.lock().await;
865                tsm.cancel_transaction(destination_mac, invoke_id);
866                return Err(e);
867            }
868
869            match timeout(timeout_duration, &mut rx).await {
870                Ok(Ok(response)) => {
871                    // Response received — convert TsmResponse to Result
872                    return match response {
873                        TsmResponse::SimpleAck => Ok(Bytes::new()),
874                        TsmResponse::ComplexAck { service_data } => Ok(service_data),
875                        TsmResponse::Error { class, code } => Err(Error::Protocol { class, code }),
876                        TsmResponse::Reject { reason } => Err(Error::Reject { reason }),
877                        TsmResponse::Abort { reason } => Err(Error::Abort { reason }),
878                    };
879                }
880                Ok(Err(_)) => {
881                    // Channel closed — TSM transaction was cancelled externally
882                    return Err(Error::Encoding("TSM response channel closed".into()));
883                }
884                Err(_timeout) => {
885                    attempts += 1;
886                    if attempts > max_retries {
887                        // Final timeout — cancel TSM transaction and return error
888                        let mut tsm = self.tsm.lock().await;
889                        tsm.cancel_transaction(destination_mac, invoke_id);
890                        return Err(Error::Timeout(timeout_duration));
891                    }
892                    debug!(
893                        invoke_id,
894                        attempt = attempts,
895                        max_retries,
896                        "APDU timeout, retrying confirmed request"
897                    );
898                }
899            }
900        }
901    }
902
903    /// Send a confirmed request using segmented transfer.
904    ///
905    /// Splits the service data into segments, sends them with windowed flow
906    /// control (SegmentAck from server), then waits for the final response.
907    async fn segmented_confirmed_request(
908        &self,
909        destination_mac: &[u8],
910        service_choice: ConfirmedServiceChoice,
911        service_data: &[u8],
912        remote_max_apdu: u16,
913        remote_max_segments: Option<u32>,
914    ) -> Result<Bytes, Error> {
915        let max_seg_size = max_segment_payload(remote_max_apdu, SegmentedPduType::ConfirmedRequest);
916        let segments = split_payload(service_data, max_seg_size);
917        let total_segments = segments.len();
918
919        if total_segments > 255 {
920            return Err(Error::Segmentation(format!(
921                "payload requires {} segments, maximum is 255",
922                total_segments
923            )));
924        }
925
926        if let Some(max_seg) = remote_max_segments {
927            if total_segments > max_seg as usize {
928                return Err(Error::Segmentation(format!(
929                    "request requires {} segments but remote accepts at most {}",
930                    total_segments, max_seg
931                )));
932            }
933        }
934
935        debug!(
936            total_segments,
937            max_seg_size,
938            service_data_len = service_data.len(),
939            "Starting segmented confirmed request"
940        );
941
942        // Allocate invoke ID and register TSM transaction (for final response).
943        let (invoke_id, rx) = {
944            let mut tsm = self.tsm.lock().await;
945            let invoke_id = tsm.allocate_invoke_id(destination_mac).ok_or_else(|| {
946                Error::Encoding("all invoke IDs exhausted for destination".into())
947            })?;
948            let rx = tsm.register_transaction(MacAddr::from_slice(destination_mac), invoke_id);
949            (invoke_id, rx)
950        };
951
952        // Register a channel for receiving SegmentAck PDUs during the send.
953        let (seg_ack_tx, mut seg_ack_rx) = mpsc::channel(16);
954        {
955            let key = (MacAddr::from_slice(destination_mac), invoke_id);
956            self.seg_ack_senders.lock().await.insert(key, seg_ack_tx);
957        }
958
959        let timeout_duration = Duration::from_millis(self.config.apdu_timeout_ms);
960        let max_ack_retries = self.config.apdu_retries;
961        let mut window_size = self.config.proposed_window_size.max(1) as usize;
962        let mut next_seq: usize = 0;
963        let mut neg_ack_retries: u32 = 0;
964        const MAX_NEG_ACK_RETRIES: u32 = 10;
965
966        // Send segments in windows, waiting for SegmentAck after each window.
967        let result = async {
968            while next_seq < total_segments {
969                let window_end = (next_seq + window_size).min(total_segments);
970
971                for (seq, segment_data) in segments[next_seq..window_end]
972                    .iter()
973                    .enumerate()
974                    .map(|(i, s)| (next_seq + i, s))
975                {
976                    let is_last = seq == total_segments - 1;
977                    let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
978                        segmented: true,
979                        more_follows: !is_last,
980                        segmented_response_accepted: self.config.segmented_response_accepted,
981                        max_segments: self.config.max_segments,
982                        max_apdu_length: self.config.max_apdu_length,
983                        invoke_id,
984                        sequence_number: Some(seq as u8),
985                        proposed_window_size: Some(self.config.proposed_window_size.max(1)),
986                        service_choice,
987                        service_request: segment_data.clone(),
988                    });
989
990                    let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
991                    encode_apdu(&mut buf, &pdu);
992
993                    self.network
994                        .send_apdu(&buf, destination_mac, true, NetworkPriority::NORMAL)
995                        .await?;
996
997                    debug!(seq, is_last, "Sent segment");
998                }
999
1000                // Wait for SegmentAck with retry logic matching the unsegmented path.
1001                let ack = {
1002                    let mut ack_retries: u8 = 0;
1003                    loop {
1004                        match timeout(timeout_duration, seg_ack_rx.recv()).await {
1005                            Ok(Some(ack)) => break ack,
1006                            Ok(None) => {
1007                                return Err(Error::Encoding("SegmentAck channel closed".into()));
1008                            }
1009                            Err(_timeout) => {
1010                                ack_retries += 1;
1011                                if ack_retries > max_ack_retries {
1012                                    return Err(Error::Timeout(timeout_duration));
1013                                }
1014                                warn!(
1015                                    attempt = ack_retries,
1016                                    "Retransmitting segmented request window"
1017                                );
1018                                // Retransmit the current window.
1019                                for (seq, segment_data) in segments[next_seq..window_end]
1020                                    .iter()
1021                                    .enumerate()
1022                                    .map(|(i, s)| (next_seq + i, s))
1023                                {
1024                                    let is_last = seq == total_segments - 1;
1025                                    let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
1026                                        segmented: true,
1027                                        more_follows: !is_last,
1028                                        segmented_response_accepted: self
1029                                            .config
1030                                            .segmented_response_accepted,
1031                                        max_segments: self.config.max_segments,
1032                                        max_apdu_length: self.config.max_apdu_length,
1033                                        invoke_id,
1034                                        sequence_number: Some(seq as u8),
1035                                        proposed_window_size: Some(
1036                                            self.config.proposed_window_size.max(1),
1037                                        ),
1038                                        service_choice,
1039                                        service_request: segment_data.clone(),
1040                                    });
1041
1042                                    let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
1043                                    encode_apdu(&mut buf, &pdu);
1044
1045                                    self.network
1046                                        .send_apdu(
1047                                            &buf,
1048                                            destination_mac,
1049                                            true,
1050                                            NetworkPriority::NORMAL,
1051                                        )
1052                                        .await?;
1053                                }
1054                            }
1055                        }
1056                    }
1057                };
1058
1059                debug!(
1060                    seq = ack.sequence_number,
1061                    negative = ack.negative_ack,
1062                    window = ack.actual_window_size,
1063                    "Received SegmentAck"
1064                );
1065
1066                // Update window size from server's response.
1067                window_size = ack.actual_window_size.max(1) as usize;
1068
1069                if ack.negative_ack {
1070                    neg_ack_retries += 1;
1071                    if neg_ack_retries > MAX_NEG_ACK_RETRIES {
1072                        return Err(Error::Segmentation(
1073                            "too many negative SegmentAck retransmissions".into(),
1074                        ));
1075                    }
1076                    // Server is requesting retransmission from this sequence.
1077                    next_seq = ack.sequence_number as usize;
1078                } else {
1079                    neg_ack_retries = 0;
1080                    // Advance past the acknowledged segment.
1081                    next_seq = ack.sequence_number as usize + 1;
1082                }
1083            }
1084
1085            // All segments sent and acknowledged. Wait for final response via TSM.
1086            timeout(timeout_duration, rx)
1087                .await
1088                .map_err(|_| Error::Timeout(timeout_duration))?
1089                .map_err(|_| Error::Encoding("TSM response channel closed".into()))
1090        }
1091        .await;
1092
1093        // Clean up seg_ack channel regardless of outcome.
1094        {
1095            let key = (MacAddr::from_slice(destination_mac), invoke_id);
1096            self.seg_ack_senders.lock().await.remove(&key);
1097        }
1098
1099        // On error, cancel the TSM transaction.
1100        let response = match result {
1101            Ok(response) => response,
1102            Err(e) => {
1103                let mut tsm = self.tsm.lock().await;
1104                tsm.cancel_transaction(destination_mac, invoke_id);
1105                return Err(e);
1106            }
1107        };
1108
1109        match response {
1110            TsmResponse::SimpleAck => Ok(Bytes::new()),
1111            TsmResponse::ComplexAck { service_data } => Ok(service_data),
1112            TsmResponse::Error { class, code } => Err(Error::Protocol { class, code }),
1113            TsmResponse::Reject { reason } => Err(Error::Reject { reason }),
1114            TsmResponse::Abort { reason } => Err(Error::Abort { reason }),
1115        }
1116    }
1117
1118    /// Send an unconfirmed request (fire-and-forget) to a specific destination.
1119    pub async fn unconfirmed_request(
1120        &self,
1121        destination_mac: &[u8],
1122        service_choice: UnconfirmedServiceChoice,
1123        service_data: &[u8],
1124    ) -> Result<(), Error> {
1125        let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1126            service_choice,
1127            service_request: Bytes::copy_from_slice(service_data),
1128        });
1129
1130        let mut buf = BytesMut::with_capacity(2 + service_data.len());
1131        encode_apdu(&mut buf, &pdu);
1132
1133        self.network
1134            .send_apdu(&buf, destination_mac, false, NetworkPriority::NORMAL)
1135            .await
1136    }
1137
1138    /// Broadcast an unconfirmed request on the local network.
1139    pub async fn broadcast_unconfirmed(
1140        &self,
1141        service_choice: UnconfirmedServiceChoice,
1142        service_data: &[u8],
1143    ) -> Result<(), Error> {
1144        let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1145            service_choice,
1146            service_request: Bytes::copy_from_slice(service_data),
1147        });
1148
1149        let mut buf = BytesMut::with_capacity(2 + service_data.len());
1150        encode_apdu(&mut buf, &pdu);
1151
1152        self.network
1153            .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1154            .await
1155    }
1156
1157    /// Broadcast an unconfirmed request globally (DNET=0xFFFF).
1158    ///
1159    /// Unlike `broadcast_unconfirmed()` which only reaches the local subnet,
1160    /// this sends a global broadcast that routers will forward to all networks.
1161    pub async fn broadcast_global_unconfirmed(
1162        &self,
1163        service_choice: UnconfirmedServiceChoice,
1164        service_data: &[u8],
1165    ) -> Result<(), Error> {
1166        let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1167            service_choice,
1168            service_request: Bytes::copy_from_slice(service_data),
1169        });
1170
1171        let mut buf = BytesMut::with_capacity(2 + service_data.len());
1172        encode_apdu(&mut buf, &pdu);
1173
1174        self.network
1175            .broadcast_global_apdu(&buf, false, NetworkPriority::NORMAL)
1176            .await
1177    }
1178
1179    /// Broadcast an unconfirmed request to a specific remote network.
1180    pub async fn broadcast_network_unconfirmed(
1181        &self,
1182        service_choice: UnconfirmedServiceChoice,
1183        service_data: &[u8],
1184        dest_network: u16,
1185    ) -> Result<(), Error> {
1186        let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1187            service_choice,
1188            service_request: Bytes::copy_from_slice(service_data),
1189        });
1190
1191        let mut buf = BytesMut::with_capacity(2 + service_data.len());
1192        encode_apdu(&mut buf, &pdu);
1193
1194        self.network
1195            .broadcast_to_network(&buf, dest_network, false, NetworkPriority::NORMAL)
1196            .await
1197    }
1198
1199    // -----------------------------------------------------------------------
1200    // High-level API
1201    // -----------------------------------------------------------------------
1202
1203    /// Read a property from a remote device.
1204    pub async fn read_property(
1205        &self,
1206        destination_mac: &[u8],
1207        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1208        property_identifier: bacnet_types::enums::PropertyIdentifier,
1209        property_array_index: Option<u32>,
1210    ) -> Result<bacnet_services::read_property::ReadPropertyACK, Error> {
1211        use bacnet_services::read_property::ReadPropertyRequest;
1212
1213        let request = ReadPropertyRequest {
1214            object_identifier,
1215            property_identifier,
1216            property_array_index,
1217        };
1218        let mut buf = BytesMut::new();
1219        request.encode(&mut buf);
1220
1221        let response_data = self
1222            .confirmed_request(destination_mac, ConfirmedServiceChoice::READ_PROPERTY, &buf)
1223            .await?;
1224
1225        bacnet_services::read_property::ReadPropertyACK::decode(&response_data)
1226    }
1227
1228    /// Write a property on a remote device.
1229    pub async fn write_property(
1230        &self,
1231        destination_mac: &[u8],
1232        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1233        property_identifier: bacnet_types::enums::PropertyIdentifier,
1234        property_array_index: Option<u32>,
1235        property_value: Vec<u8>,
1236        priority: Option<u8>,
1237    ) -> Result<(), Error> {
1238        use bacnet_services::write_property::WritePropertyRequest;
1239
1240        let request = WritePropertyRequest {
1241            object_identifier,
1242            property_identifier,
1243            property_array_index,
1244            property_value,
1245            priority,
1246        };
1247        let mut buf = BytesMut::new();
1248        request.encode(&mut buf);
1249
1250        let _ = self
1251            .confirmed_request(
1252                destination_mac,
1253                ConfirmedServiceChoice::WRITE_PROPERTY,
1254                &buf,
1255            )
1256            .await?;
1257
1258        Ok(())
1259    }
1260
1261    /// Read multiple properties from one or more objects on a remote device.
1262    pub async fn read_property_multiple(
1263        &self,
1264        destination_mac: &[u8],
1265        specs: Vec<bacnet_services::rpm::ReadAccessSpecification>,
1266    ) -> Result<bacnet_services::rpm::ReadPropertyMultipleACK, Error> {
1267        use bacnet_services::rpm::{ReadPropertyMultipleACK, ReadPropertyMultipleRequest};
1268
1269        let request = ReadPropertyMultipleRequest {
1270            list_of_read_access_specs: specs,
1271        };
1272        let mut buf = BytesMut::new();
1273        request.encode(&mut buf);
1274
1275        let response_data = self
1276            .confirmed_request(
1277                destination_mac,
1278                ConfirmedServiceChoice::READ_PROPERTY_MULTIPLE,
1279                &buf,
1280            )
1281            .await?;
1282
1283        ReadPropertyMultipleACK::decode(&response_data)
1284    }
1285
1286    /// Write multiple properties on one or more objects on a remote device.
1287    pub async fn write_property_multiple(
1288        &self,
1289        destination_mac: &[u8],
1290        specs: Vec<bacnet_services::wpm::WriteAccessSpecification>,
1291    ) -> Result<(), Error> {
1292        use bacnet_services::wpm::WritePropertyMultipleRequest;
1293
1294        let request = WritePropertyMultipleRequest {
1295            list_of_write_access_specs: specs,
1296        };
1297        let mut buf = BytesMut::new();
1298        request.encode(&mut buf);
1299
1300        let _ = self
1301            .confirmed_request(
1302                destination_mac,
1303                ConfirmedServiceChoice::WRITE_PROPERTY_MULTIPLE,
1304                &buf,
1305            )
1306            .await?;
1307
1308        Ok(())
1309    }
1310
1311    /// Send a WhoIs broadcast to discover devices.
1312    pub async fn who_is(
1313        &self,
1314        low_limit: Option<u32>,
1315        high_limit: Option<u32>,
1316    ) -> Result<(), Error> {
1317        use bacnet_services::who_is::WhoIsRequest;
1318
1319        let request = WhoIsRequest {
1320            low_limit,
1321            high_limit,
1322        };
1323        let mut buf = BytesMut::new();
1324        request.encode(&mut buf);
1325
1326        self.broadcast_global_unconfirmed(UnconfirmedServiceChoice::WHO_IS, &buf)
1327            .await
1328    }
1329
1330    /// Send a directed (unicast) WhoIs to a specific device.
1331    pub async fn who_is_directed(
1332        &self,
1333        destination_mac: &[u8],
1334        low_limit: Option<u32>,
1335        high_limit: Option<u32>,
1336    ) -> Result<(), Error> {
1337        use bacnet_services::who_is::WhoIsRequest;
1338
1339        let request = WhoIsRequest {
1340            low_limit,
1341            high_limit,
1342        };
1343        let mut buf = BytesMut::new();
1344        request.encode(&mut buf);
1345
1346        self.unconfirmed_request(destination_mac, UnconfirmedServiceChoice::WHO_IS, &buf)
1347            .await
1348    }
1349
1350    /// Send a WhoIs broadcast to a specific remote network.
1351    ///
1352    /// Unlike `who_is()` which broadcasts globally (DNET=0xFFFF), this
1353    /// targets a single network number so only devices on that network respond.
1354    pub async fn who_is_network(
1355        &self,
1356        dest_network: u16,
1357        low_limit: Option<u32>,
1358        high_limit: Option<u32>,
1359    ) -> Result<(), Error> {
1360        use bacnet_services::who_is::WhoIsRequest;
1361
1362        let request = WhoIsRequest {
1363            low_limit,
1364            high_limit,
1365        };
1366        let mut buf = BytesMut::new();
1367        request.encode(&mut buf);
1368
1369        self.broadcast_network_unconfirmed(UnconfirmedServiceChoice::WHO_IS, &buf, dest_network)
1370            .await
1371    }
1372
1373    /// Send a WhoHas broadcast to find an object by identifier or name.
1374    pub async fn who_has(
1375        &self,
1376        object: bacnet_services::who_has::WhoHasObject,
1377        low_limit: Option<u32>,
1378        high_limit: Option<u32>,
1379    ) -> Result<(), Error> {
1380        use bacnet_services::who_has::WhoHasRequest;
1381
1382        let request = WhoHasRequest {
1383            low_limit,
1384            high_limit,
1385            object,
1386        };
1387        let mut buf = BytesMut::new();
1388        request.encode(&mut buf)?;
1389
1390        self.broadcast_unconfirmed(UnconfirmedServiceChoice::WHO_HAS, &buf)
1391            .await
1392    }
1393
1394    /// Subscribe to COV notifications for an object on a remote device.
1395    pub async fn subscribe_cov(
1396        &self,
1397        destination_mac: &[u8],
1398        subscriber_process_identifier: u32,
1399        monitored_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1400        confirmed: bool,
1401        lifetime: Option<u32>,
1402    ) -> Result<(), Error> {
1403        use bacnet_services::cov::SubscribeCOVRequest;
1404
1405        let request = SubscribeCOVRequest {
1406            subscriber_process_identifier,
1407            monitored_object_identifier,
1408            issue_confirmed_notifications: Some(confirmed),
1409            lifetime,
1410        };
1411        let mut buf = BytesMut::new();
1412        request.encode(&mut buf);
1413
1414        let _ = self
1415            .confirmed_request(destination_mac, ConfirmedServiceChoice::SUBSCRIBE_COV, &buf)
1416            .await?;
1417
1418        Ok(())
1419    }
1420
1421    /// Cancel a COV subscription on a remote device.
1422    pub async fn unsubscribe_cov(
1423        &self,
1424        destination_mac: &[u8],
1425        subscriber_process_identifier: u32,
1426        monitored_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1427    ) -> Result<(), Error> {
1428        use bacnet_services::cov::SubscribeCOVRequest;
1429
1430        let request = SubscribeCOVRequest {
1431            subscriber_process_identifier,
1432            monitored_object_identifier,
1433            issue_confirmed_notifications: None,
1434            lifetime: None,
1435        };
1436        let mut buf = BytesMut::new();
1437        request.encode(&mut buf);
1438
1439        let _ = self
1440            .confirmed_request(destination_mac, ConfirmedServiceChoice::SUBSCRIBE_COV, &buf)
1441            .await?;
1442
1443        Ok(())
1444    }
1445
1446    /// Delete an object on a remote device.
1447    pub async fn delete_object(
1448        &self,
1449        destination_mac: &[u8],
1450        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1451    ) -> Result<(), Error> {
1452        use bacnet_services::object_mgmt::DeleteObjectRequest;
1453
1454        let request = DeleteObjectRequest { object_identifier };
1455        let mut buf = BytesMut::new();
1456        request.encode(&mut buf);
1457
1458        let _ = self
1459            .confirmed_request(destination_mac, ConfirmedServiceChoice::DELETE_OBJECT, &buf)
1460            .await?;
1461
1462        Ok(())
1463    }
1464
1465    /// Create an object on a remote device.
1466    pub async fn create_object(
1467        &self,
1468        destination_mac: &[u8],
1469        object_specifier: bacnet_services::object_mgmt::ObjectSpecifier,
1470        initial_values: Vec<bacnet_services::common::BACnetPropertyValue>,
1471    ) -> Result<Bytes, Error> {
1472        use bacnet_services::object_mgmt::CreateObjectRequest;
1473
1474        let request = CreateObjectRequest {
1475            object_specifier,
1476            list_of_initial_values: initial_values,
1477        };
1478        let mut buf = BytesMut::new();
1479        request.encode(&mut buf);
1480
1481        self.confirmed_request(destination_mac, ConfirmedServiceChoice::CREATE_OBJECT, &buf)
1482            .await
1483    }
1484
1485    /// Send DeviceCommunicationControl to a remote device.
1486    pub async fn device_communication_control(
1487        &self,
1488        destination_mac: &[u8],
1489        enable_disable: bacnet_types::enums::EnableDisable,
1490        time_duration: Option<u16>,
1491        password: Option<String>,
1492    ) -> Result<(), Error> {
1493        use bacnet_services::device_mgmt::DeviceCommunicationControlRequest;
1494
1495        let request = DeviceCommunicationControlRequest {
1496            time_duration,
1497            enable_disable,
1498            password,
1499        };
1500        let mut buf = BytesMut::new();
1501        request.encode(&mut buf)?;
1502
1503        let _ = self
1504            .confirmed_request(
1505                destination_mac,
1506                ConfirmedServiceChoice::DEVICE_COMMUNICATION_CONTROL,
1507                &buf,
1508            )
1509            .await?;
1510
1511        Ok(())
1512    }
1513
1514    /// Send ReinitializeDevice to a remote device.
1515    pub async fn reinitialize_device(
1516        &self,
1517        destination_mac: &[u8],
1518        reinitialized_state: bacnet_types::enums::ReinitializedState,
1519        password: Option<String>,
1520    ) -> Result<(), Error> {
1521        use bacnet_services::device_mgmt::ReinitializeDeviceRequest;
1522
1523        let request = ReinitializeDeviceRequest {
1524            reinitialized_state,
1525            password,
1526        };
1527        let mut buf = BytesMut::new();
1528        request.encode(&mut buf)?;
1529
1530        let _ = self
1531            .confirmed_request(
1532                destination_mac,
1533                ConfirmedServiceChoice::REINITIALIZE_DEVICE,
1534                &buf,
1535            )
1536            .await?;
1537
1538        Ok(())
1539    }
1540
1541    /// Get event information from a remote device.
1542    pub async fn get_event_information(
1543        &self,
1544        destination_mac: &[u8],
1545        last_received_object_identifier: Option<bacnet_types::primitives::ObjectIdentifier>,
1546    ) -> Result<Bytes, Error> {
1547        use bacnet_services::alarm_event::GetEventInformationRequest;
1548
1549        let request = GetEventInformationRequest {
1550            last_received_object_identifier,
1551        };
1552        let mut buf = BytesMut::new();
1553        request.encode(&mut buf);
1554
1555        self.confirmed_request(
1556            destination_mac,
1557            ConfirmedServiceChoice::GET_EVENT_INFORMATION,
1558            &buf,
1559        )
1560        .await
1561    }
1562
1563    /// Acknowledge an alarm on a remote device.
1564    pub async fn acknowledge_alarm(
1565        &self,
1566        destination_mac: &[u8],
1567        acknowledging_process_identifier: u32,
1568        event_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1569        event_state_acknowledged: u32,
1570        acknowledgment_source: &str,
1571    ) -> Result<(), Error> {
1572        use bacnet_services::alarm_event::AcknowledgeAlarmRequest;
1573
1574        let request = AcknowledgeAlarmRequest {
1575            acknowledging_process_identifier,
1576            event_object_identifier,
1577            event_state_acknowledged,
1578            timestamp: bacnet_types::primitives::BACnetTimeStamp::SequenceNumber(0),
1579            acknowledgment_source: acknowledgment_source.to_string(),
1580        };
1581        let mut buf = BytesMut::new();
1582        request.encode(&mut buf)?;
1583
1584        let _ = self
1585            .confirmed_request(
1586                destination_mac,
1587                ConfirmedServiceChoice::ACKNOWLEDGE_ALARM,
1588                &buf,
1589            )
1590            .await?;
1591
1592        Ok(())
1593    }
1594
1595    /// Read a range of items from a list or log-buffer property.
1596    pub async fn read_range(
1597        &self,
1598        destination_mac: &[u8],
1599        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1600        property_identifier: bacnet_types::enums::PropertyIdentifier,
1601        property_array_index: Option<u32>,
1602        range: Option<bacnet_services::read_range::RangeSpec>,
1603    ) -> Result<bacnet_services::read_range::ReadRangeAck, Error> {
1604        use bacnet_services::read_range::{ReadRangeAck, ReadRangeRequest};
1605
1606        let request = ReadRangeRequest {
1607            object_identifier,
1608            property_identifier,
1609            property_array_index,
1610            range,
1611        };
1612        let mut buf = BytesMut::new();
1613        request.encode(&mut buf);
1614
1615        let response_data = self
1616            .confirmed_request(destination_mac, ConfirmedServiceChoice::READ_RANGE, &buf)
1617            .await?;
1618
1619        ReadRangeAck::decode(&response_data)
1620    }
1621
1622    /// Read file data from a remote device (stream or record access).
1623    pub async fn atomic_read_file(
1624        &self,
1625        destination_mac: &[u8],
1626        file_identifier: bacnet_types::primitives::ObjectIdentifier,
1627        access: bacnet_services::file::FileAccessMethod,
1628    ) -> Result<Bytes, Error> {
1629        use bacnet_services::file::AtomicReadFileRequest;
1630
1631        let request = AtomicReadFileRequest {
1632            file_identifier,
1633            access,
1634        };
1635        let mut buf = BytesMut::new();
1636        request.encode(&mut buf);
1637
1638        self.confirmed_request(
1639            destination_mac,
1640            ConfirmedServiceChoice::ATOMIC_READ_FILE,
1641            &buf,
1642        )
1643        .await
1644    }
1645
1646    /// Write file data to a remote device (stream or record access).
1647    pub async fn atomic_write_file(
1648        &self,
1649        destination_mac: &[u8],
1650        file_identifier: bacnet_types::primitives::ObjectIdentifier,
1651        access: bacnet_services::file::FileWriteAccessMethod,
1652    ) -> Result<Bytes, Error> {
1653        use bacnet_services::file::AtomicWriteFileRequest;
1654
1655        let request = AtomicWriteFileRequest {
1656            file_identifier,
1657            access,
1658        };
1659        let mut buf = BytesMut::new();
1660        request.encode(&mut buf);
1661
1662        self.confirmed_request(
1663            destination_mac,
1664            ConfirmedServiceChoice::ATOMIC_WRITE_FILE,
1665            &buf,
1666        )
1667        .await
1668    }
1669
1670    /// Add elements to a list property on a remote device.
1671    pub async fn add_list_element(
1672        &self,
1673        destination_mac: &[u8],
1674        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1675        property_identifier: bacnet_types::enums::PropertyIdentifier,
1676        property_array_index: Option<u32>,
1677        list_of_elements: Vec<u8>,
1678    ) -> Result<(), Error> {
1679        use bacnet_services::list_manipulation::ListElementRequest;
1680
1681        let request = ListElementRequest {
1682            object_identifier,
1683            property_identifier,
1684            property_array_index,
1685            list_of_elements,
1686        };
1687        let mut buf = BytesMut::new();
1688        request.encode(&mut buf);
1689
1690        let _ = self
1691            .confirmed_request(
1692                destination_mac,
1693                ConfirmedServiceChoice::ADD_LIST_ELEMENT,
1694                &buf,
1695            )
1696            .await?;
1697
1698        Ok(())
1699    }
1700
1701    /// Remove elements from a list property on a remote device.
1702    pub async fn remove_list_element(
1703        &self,
1704        destination_mac: &[u8],
1705        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1706        property_identifier: bacnet_types::enums::PropertyIdentifier,
1707        property_array_index: Option<u32>,
1708        list_of_elements: Vec<u8>,
1709    ) -> Result<(), Error> {
1710        use bacnet_services::list_manipulation::ListElementRequest;
1711
1712        let request = ListElementRequest {
1713            object_identifier,
1714            property_identifier,
1715            property_array_index,
1716            list_of_elements,
1717        };
1718        let mut buf = BytesMut::new();
1719        request.encode(&mut buf);
1720
1721        let _ = self
1722            .confirmed_request(
1723                destination_mac,
1724                ConfirmedServiceChoice::REMOVE_LIST_ELEMENT,
1725                &buf,
1726            )
1727            .await?;
1728
1729        Ok(())
1730    }
1731
1732    /// Send a TimeSynchronization request to a device (Clause 16.10.5).
1733    ///
1734    /// This is an unconfirmed service — no response is expected.
1735    pub async fn time_synchronization(
1736        &self,
1737        destination_mac: &[u8],
1738        date: bacnet_types::primitives::Date,
1739        time: bacnet_types::primitives::Time,
1740    ) -> Result<(), Error> {
1741        use bacnet_services::device_mgmt::TimeSynchronizationRequest;
1742
1743        let request = TimeSynchronizationRequest { date, time };
1744        let mut buf = BytesMut::new();
1745        request.encode(&mut buf);
1746
1747        self.unconfirmed_request(
1748            destination_mac,
1749            UnconfirmedServiceChoice::TIME_SYNCHRONIZATION,
1750            &buf,
1751        )
1752        .await
1753    }
1754
1755    /// Send a UTCTimeSynchronization request to a device (Clause 16.10.6).
1756    ///
1757    /// This is an unconfirmed service — no response is expected.
1758    pub async fn utc_time_synchronization(
1759        &self,
1760        destination_mac: &[u8],
1761        date: bacnet_types::primitives::Date,
1762        time: bacnet_types::primitives::Time,
1763    ) -> Result<(), Error> {
1764        use bacnet_services::device_mgmt::TimeSynchronizationRequest;
1765
1766        let request = TimeSynchronizationRequest { date, time };
1767        let mut buf = BytesMut::new();
1768        request.encode(&mut buf);
1769
1770        self.unconfirmed_request(
1771            destination_mac,
1772            UnconfirmedServiceChoice::UTC_TIME_SYNCHRONIZATION,
1773            &buf,
1774        )
1775        .await
1776    }
1777
1778    /// Get a receiver for incoming COV notifications.
1779    ///
1780    /// Can be called multiple times — each call returns a new independent
1781    /// receiver that gets all notifications from that point forward.
1782    pub fn cov_notifications(&self) -> broadcast::Receiver<COVNotificationRequest> {
1783        self.cov_tx.subscribe()
1784    }
1785
1786    // -----------------------------------------------------------------------
1787    // Device discovery
1788    // -----------------------------------------------------------------------
1789
1790    /// Get a snapshot of all discovered devices (from IAm responses).
1791    pub async fn discovered_devices(&self) -> Vec<DiscoveredDevice> {
1792        self.device_table.lock().await.all()
1793    }
1794
1795    /// Look up a discovered device by instance number.
1796    pub async fn get_device(&self, instance: u32) -> Option<DiscoveredDevice> {
1797        self.device_table.lock().await.get(instance).cloned()
1798    }
1799
1800    /// Clear the discovered devices table.
1801    pub async fn clear_devices(&self) {
1802        self.device_table.lock().await.clear();
1803    }
1804
1805    /// Stop the client, aborting the dispatch task.
1806    pub async fn stop(&mut self) -> Result<(), Error> {
1807        if let Some(task) = self.dispatch_task.take() {
1808            task.abort();
1809            let _ = task.await;
1810        }
1811        // Network/transport cleanup happens when the Arc is dropped.
1812        Ok(())
1813    }
1814}
1815
1816#[cfg(test)]
1817mod tests {
1818    use super::*;
1819    use bacnet_encoding::apdu::{ComplexAck, SimpleAck};
1820    use std::net::Ipv4Addr;
1821    use tokio::time::Duration;
1822
1823    /// Helper: build a client on loopback with ephemeral port and short timeout.
1824    async fn make_client() -> BACnetClient<BipTransport> {
1825        BACnetClient::builder()
1826            .interface(Ipv4Addr::LOCALHOST)
1827            .port(0)
1828            .apdu_timeout_ms(2000)
1829            .build()
1830            .await
1831            .unwrap()
1832    }
1833
1834    #[tokio::test]
1835    async fn client_start_stop() {
1836        let mut client = make_client().await;
1837        assert!(!client.local_mac().is_empty());
1838        client.stop().await.unwrap();
1839    }
1840
1841    #[tokio::test]
1842    async fn confirmed_request_simple_ack() {
1843        let mut client_a = make_client().await;
1844
1845        // Create a second network layer to act as "server B"
1846        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1847        let mut net_b = NetworkLayer::new(transport_b);
1848        let mut rx_b = net_b.start().await.unwrap();
1849        let b_mac = net_b.local_mac().to_vec();
1850
1851        // Spawn a task that receives the request and sends back SimpleAck
1852        let b_handle = tokio::spawn(async move {
1853            let received = timeout(Duration::from_secs(2), rx_b.recv())
1854                .await
1855                .expect("B timed out")
1856                .expect("B channel closed");
1857
1858            let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1859            if let Apdu::ConfirmedRequest(req) = decoded {
1860                let ack = Apdu::SimpleAck(SimpleAck {
1861                    invoke_id: req.invoke_id,
1862                    service_choice: req.service_choice,
1863                });
1864                let mut buf = BytesMut::new();
1865                encode_apdu(&mut buf, &ack);
1866                net_b
1867                    .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1868                    .await
1869                    .unwrap();
1870            }
1871            net_b.stop().await.unwrap();
1872        });
1873
1874        let result = client_a
1875            .confirmed_request(
1876                &b_mac,
1877                ConfirmedServiceChoice::WRITE_PROPERTY,
1878                &[0x01, 0x02],
1879            )
1880            .await;
1881
1882        assert!(result.is_ok());
1883        let response = result.unwrap();
1884        assert!(response.is_empty()); // SimpleAck has no service data
1885
1886        b_handle.await.unwrap();
1887        client_a.stop().await.unwrap();
1888    }
1889
1890    #[tokio::test]
1891    async fn confirmed_request_complex_ack() {
1892        let mut client_a = make_client().await;
1893
1894        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1895        let mut net_b = NetworkLayer::new(transport_b);
1896        let mut rx_b = net_b.start().await.unwrap();
1897        let b_mac = net_b.local_mac().to_vec();
1898
1899        let b_handle = tokio::spawn(async move {
1900            let received = timeout(Duration::from_secs(2), rx_b.recv())
1901                .await
1902                .unwrap()
1903                .unwrap();
1904
1905            let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1906            if let Apdu::ConfirmedRequest(req) = decoded {
1907                let ack = Apdu::ComplexAck(ComplexAck {
1908                    segmented: false,
1909                    more_follows: false,
1910                    invoke_id: req.invoke_id,
1911                    sequence_number: None,
1912                    proposed_window_size: None,
1913                    service_choice: req.service_choice,
1914                    service_ack: Bytes::from_static(&[0xDE, 0xAD, 0xBE, 0xEF]),
1915                });
1916                let mut buf = BytesMut::new();
1917                encode_apdu(&mut buf, &ack);
1918                net_b
1919                    .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1920                    .await
1921                    .unwrap();
1922            }
1923            net_b.stop().await.unwrap();
1924        });
1925
1926        let result = client_a
1927            .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
1928            .await;
1929
1930        assert!(result.is_ok());
1931        assert_eq!(result.unwrap(), vec![0xDE, 0xAD, 0xBE, 0xEF]);
1932
1933        b_handle.await.unwrap();
1934        client_a.stop().await.unwrap();
1935    }
1936
1937    #[tokio::test]
1938    async fn confirmed_request_timeout() {
1939        let mut client = make_client().await;
1940        // Send to a non-existent address — should timeout
1941        let fake_mac = vec![10, 99, 99, 99, 0xBA, 0xC0];
1942        let result = client
1943            .confirmed_request(&fake_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
1944            .await;
1945        assert!(result.is_err());
1946        client.stop().await.unwrap();
1947    }
1948
1949    #[tokio::test]
1950    async fn segmented_complex_ack_reassembly() {
1951        let mut client = make_client().await;
1952
1953        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1954        let mut net_b = NetworkLayer::new(transport_b);
1955        let mut rx_b = net_b.start().await.unwrap();
1956        let b_mac = net_b.local_mac().to_vec();
1957
1958        // Server B: receive request, respond with 3-segment ComplexAck.
1959        // Wait for SegmentAck after each segment before sending the next.
1960        let b_handle = tokio::spawn(async move {
1961            // Receive the ConfirmedRequest
1962            let received = timeout(Duration::from_secs(2), rx_b.recv())
1963                .await
1964                .unwrap()
1965                .unwrap();
1966
1967            let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1968            let invoke_id = if let Apdu::ConfirmedRequest(req) = decoded {
1969                req.invoke_id
1970            } else {
1971                panic!("Expected ConfirmedRequest");
1972            };
1973
1974            let service_choice = ConfirmedServiceChoice::READ_PROPERTY;
1975            let segments: Vec<Bytes> = vec![
1976                Bytes::from_static(&[0x01, 0x02, 0x03]),
1977                Bytes::from_static(&[0x04, 0x05, 0x06]),
1978                Bytes::from_static(&[0x07, 0x08]),
1979            ];
1980
1981            for (i, seg) in segments.iter().enumerate() {
1982                let is_last = i == segments.len() - 1;
1983                let ack = Apdu::ComplexAck(ComplexAck {
1984                    segmented: true,
1985                    more_follows: !is_last,
1986                    invoke_id,
1987                    sequence_number: Some(i as u8),
1988                    proposed_window_size: Some(1),
1989                    service_choice,
1990                    service_ack: seg.clone(),
1991                });
1992                let mut buf = BytesMut::new();
1993                encode_apdu(&mut buf, &ack);
1994                net_b
1995                    .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1996                    .await
1997                    .unwrap();
1998
1999                // Wait for SegmentAck from client
2000                let seg_ack_msg = timeout(Duration::from_secs(2), rx_b.recv())
2001                    .await
2002                    .unwrap()
2003                    .unwrap();
2004                let decoded = apdu::decode_apdu(seg_ack_msg.apdu.clone()).unwrap();
2005                if let Apdu::SegmentAck(sa) = decoded {
2006                    assert_eq!(sa.invoke_id, invoke_id);
2007                    assert_eq!(sa.sequence_number, i as u8);
2008                } else {
2009                    panic!("Expected SegmentAck, got {:?}", decoded);
2010                }
2011            }
2012
2013            net_b.stop().await.unwrap();
2014        });
2015
2016        // Client sends a request and should receive the reassembled response
2017        let result = client
2018            .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
2019            .await;
2020
2021        assert!(result.is_ok());
2022        assert_eq!(
2023            result.unwrap(),
2024            vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
2025        );
2026
2027        b_handle.await.unwrap();
2028        client.stop().await.unwrap();
2029    }
2030
2031    #[tokio::test]
2032    async fn segmented_confirmed_request_sends_segments() {
2033        // Client with max_apdu_length=50 → max segment payload = 44 bytes.
2034        // Any service_data > 46 bytes will trigger segmentation.
2035        let mut client = BACnetClient::builder()
2036            .interface(Ipv4Addr::LOCALHOST)
2037            .port(0)
2038            .apdu_timeout_ms(5000)
2039            .max_apdu_length(50)
2040            .build()
2041            .await
2042            .unwrap();
2043
2044        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2045        let mut net_b = NetworkLayer::new(transport_b);
2046        let mut rx_b = net_b.start().await.unwrap();
2047        let b_mac = net_b.local_mac().to_vec();
2048
2049        // 100 bytes of service data → ceil(100/44) = 3 segments (44 + 44 + 12)
2050        let service_data: Vec<u8> = (0u8..100).collect();
2051        let expected_data = service_data.clone();
2052
2053        let b_handle = tokio::spawn(async move {
2054            let mut all_service_data = Vec::new();
2055            let mut client_mac;
2056            let mut invoke_id;
2057
2058            loop {
2059                let received = timeout(Duration::from_secs(3), rx_b.recv())
2060                    .await
2061                    .expect("server timed out waiting for segment")
2062                    .expect("server channel closed");
2063
2064                let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2065                if let Apdu::ConfirmedRequest(req) = decoded {
2066                    assert!(req.segmented, "expected segmented request");
2067                    invoke_id = req.invoke_id;
2068                    client_mac = received.source_mac.clone();
2069                    let seq = req.sequence_number.unwrap();
2070                    all_service_data.extend_from_slice(&req.service_request);
2071
2072                    // Send SegmentAck
2073                    let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
2074                        negative_ack: false,
2075                        sent_by_server: true,
2076                        invoke_id,
2077                        sequence_number: seq,
2078                        actual_window_size: 1,
2079                    });
2080                    let mut buf = BytesMut::new();
2081                    encode_apdu(&mut buf, &seg_ack);
2082                    net_b
2083                        .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2084                        .await
2085                        .unwrap();
2086
2087                    if !req.more_follows {
2088                        break;
2089                    }
2090                } else {
2091                    panic!("Expected ConfirmedRequest, got {:?}", decoded);
2092                }
2093            }
2094
2095            // All segments received — send SimpleAck
2096            let ack = Apdu::SimpleAck(SimpleAck {
2097                invoke_id,
2098                service_choice: ConfirmedServiceChoice::WRITE_PROPERTY,
2099            });
2100            let mut buf = BytesMut::new();
2101            encode_apdu(&mut buf, &ack);
2102            net_b
2103                .send_apdu(&buf, &client_mac, false, NetworkPriority::NORMAL)
2104                .await
2105                .unwrap();
2106
2107            net_b.stop().await.unwrap();
2108            all_service_data
2109        });
2110
2111        let result = client
2112            .confirmed_request(
2113                &b_mac,
2114                ConfirmedServiceChoice::WRITE_PROPERTY,
2115                &service_data,
2116            )
2117            .await;
2118
2119        assert!(result.is_ok());
2120        assert!(result.unwrap().is_empty()); // SimpleAck has no service data
2121
2122        // Verify server received all service data correctly
2123        let received_data = b_handle.await.unwrap();
2124        assert_eq!(received_data, expected_data);
2125
2126        client.stop().await.unwrap();
2127    }
2128
2129    #[tokio::test]
2130    async fn segmented_request_with_complex_ack_response() {
2131        let mut client = BACnetClient::builder()
2132            .interface(Ipv4Addr::LOCALHOST)
2133            .port(0)
2134            .apdu_timeout_ms(5000)
2135            .max_apdu_length(50)
2136            .build()
2137            .await
2138            .unwrap();
2139
2140        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2141        let mut net_b = NetworkLayer::new(transport_b);
2142        let mut rx_b = net_b.start().await.unwrap();
2143        let b_mac = net_b.local_mac().to_vec();
2144
2145        // 60 bytes → 2 segments (44 + 16)
2146        let service_data: Vec<u8> = (0u8..60).collect();
2147
2148        let b_handle = tokio::spawn(async move {
2149            let mut client_mac;
2150            let mut invoke_id;
2151
2152            loop {
2153                let received = timeout(Duration::from_secs(3), rx_b.recv())
2154                    .await
2155                    .unwrap()
2156                    .unwrap();
2157
2158                let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2159                if let Apdu::ConfirmedRequest(req) = decoded {
2160                    invoke_id = req.invoke_id;
2161                    client_mac = received.source_mac.clone();
2162                    let seq = req.sequence_number.unwrap();
2163
2164                    let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
2165                        negative_ack: false,
2166                        sent_by_server: true,
2167                        invoke_id,
2168                        sequence_number: seq,
2169                        actual_window_size: 1,
2170                    });
2171                    let mut buf = BytesMut::new();
2172                    encode_apdu(&mut buf, &seg_ack);
2173                    net_b
2174                        .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2175                        .await
2176                        .unwrap();
2177
2178                    if !req.more_follows {
2179                        break;
2180                    }
2181                }
2182            }
2183
2184            // Send ComplexAck response
2185            let ack = Apdu::ComplexAck(ComplexAck {
2186                segmented: false,
2187                more_follows: false,
2188                invoke_id,
2189                sequence_number: None,
2190                proposed_window_size: None,
2191                service_choice: ConfirmedServiceChoice::READ_PROPERTY,
2192                service_ack: Bytes::from_static(&[0xCA, 0xFE]),
2193            });
2194            let mut buf = BytesMut::new();
2195            encode_apdu(&mut buf, &ack);
2196            net_b
2197                .send_apdu(&buf, &client_mac, false, NetworkPriority::NORMAL)
2198                .await
2199                .unwrap();
2200
2201            net_b.stop().await.unwrap();
2202        });
2203
2204        let result = client
2205            .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &service_data)
2206            .await;
2207
2208        assert!(result.is_ok());
2209        assert_eq!(result.unwrap(), vec![0xCA, 0xFE]);
2210
2211        b_handle.await.unwrap();
2212        client.stop().await.unwrap();
2213    }
2214
2215    #[tokio::test]
2216    async fn segment_overflow_guard() {
2217        // Create a client with small max_apdu_length so segments are tiny.
2218        let mut client = BACnetClient::builder()
2219            .interface(Ipv4Addr::LOCALHOST)
2220            .port(0)
2221            .apdu_timeout_ms(2000)
2222            .max_apdu_length(50)
2223            .build()
2224            .await
2225            .unwrap();
2226
2227        // With max_apdu=50, each segment carries 50-6=44 bytes.
2228        // 256 * 44 = 11,264 bytes → 256 segments, which exceeds the u8 limit.
2229        let huge_payload = vec![0u8; 256 * 44];
2230
2231        // Use a fake destination MAC not in the device table — the client
2232        // will fall back to its own max_apdu_length (50), triggering segmentation.
2233        let fake_mac = vec![10, 99, 99, 99, 0xBA, 0xC0];
2234
2235        let result = client
2236            .confirmed_request(
2237                &fake_mac,
2238                ConfirmedServiceChoice::READ_PROPERTY,
2239                &huge_payload,
2240            )
2241            .await;
2242
2243        assert!(result.is_err());
2244        let err_msg = result.unwrap_err().to_string();
2245        assert!(
2246            err_msg.contains("256 segments"),
2247            "expected segment overflow error, got: {}",
2248            err_msg
2249        );
2250
2251        client.stop().await.unwrap();
2252    }
2253
2254    #[test]
2255    fn seg_receiver_timeout_is_4s() {
2256        assert_eq!(SEG_RECEIVER_TIMEOUT, Duration::from_secs(4));
2257    }
2258}