Skip to main content

bacnet_client/
client.rs

1//! BACnetClient: high-level and low-level request APIs.
2//!
3//! The client owns a NetworkLayer, spawns an APDU dispatch task, and provides
4//! methods for sending confirmed and unconfirmed BACnet requests.
5
6use std::collections::HashMap;
7use std::net::{Ipv4Addr, Ipv6Addr};
8use std::sync::Arc;
9use std::time::Instant;
10
11use bytes::{Bytes, BytesMut};
12use tokio::sync::{broadcast, mpsc, Mutex};
13use tokio::task::JoinHandle;
14use tokio::time::{timeout, Duration};
15use tracing::{debug, warn};
16
17use bacnet_encoding::apdu::{
18    self, encode_apdu, Apdu, ConfirmedRequest as ConfirmedRequestPdu, SegmentAck as SegmentAckPdu,
19    SimpleAck,
20};
21use bacnet_network::layer::NetworkLayer;
22use bacnet_services::cov::COVNotificationRequest;
23use bacnet_transport::bip::BipTransport;
24use bacnet_transport::bip6::Bip6Transport;
25use bacnet_transport::port::TransportPort;
26use bacnet_types::enums::{ConfirmedServiceChoice, NetworkPriority, UnconfirmedServiceChoice};
27use bacnet_types::error::Error;
28use bacnet_types::MacAddr;
29
30use crate::discovery::{DeviceTable, DiscoveredDevice};
31use crate::segmentation::{max_segment_payload, split_payload, SegmentReceiver, SegmentedPduType};
32use crate::tsm::{Tsm, TsmConfig, TsmResponse};
33
34/// Client configuration.
35#[derive(Debug, Clone)]
36pub struct ClientConfig {
37    /// Local interface to bind.
38    pub interface: Ipv4Addr,
39    /// UDP port (0 for ephemeral).
40    pub port: u16,
41    /// Directed broadcast address.
42    pub broadcast_address: Ipv4Addr,
43    /// APDU timeout in milliseconds.
44    pub apdu_timeout_ms: u64,
45    /// Number of APDU retries.
46    pub apdu_retries: u8,
47    /// Maximum APDU length this client accepts.
48    pub max_apdu_length: u16,
49    /// Maximum segments this client accepts (None = unspecified).
50    pub max_segments: Option<u8>,
51    /// Whether this client accepts segmented responses.
52    pub segmented_response_accepted: bool,
53    /// Proposed window size for segmented transfers (1-127, default 1).
54    pub proposed_window_size: u8,
55}
56
57impl Default for ClientConfig {
58    fn default() -> Self {
59        Self {
60            interface: Ipv4Addr::UNSPECIFIED,
61            port: 0xBAC0,
62            broadcast_address: Ipv4Addr::BROADCAST,
63            apdu_timeout_ms: 6000,
64            apdu_retries: 3,
65            max_apdu_length: 1476,
66            max_segments: None,
67            segmented_response_accepted: true,
68            proposed_window_size: 1,
69        }
70    }
71}
72
73/// Generic builder for BACnetClient with a pre-built transport.
74pub struct ClientBuilder<T: TransportPort> {
75    config: ClientConfig,
76    transport: Option<T>,
77}
78
79impl<T: TransportPort + 'static> ClientBuilder<T> {
80    /// Set the pre-built transport.
81    pub fn transport(mut self, transport: T) -> Self {
82        self.transport = Some(transport);
83        self
84    }
85
86    /// Set APDU timeout in milliseconds.
87    pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
88        self.config.apdu_timeout_ms = ms;
89        self
90    }
91
92    /// Set the maximum APDU length this client accepts.
93    pub fn max_apdu_length(mut self, len: u16) -> Self {
94        self.config.max_apdu_length = len;
95        self
96    }
97
98    /// Build and start the client.
99    pub async fn build(self) -> Result<BACnetClient<T>, Error> {
100        let transport = self
101            .transport
102            .ok_or_else(|| Error::Encoding("transport not set on ClientBuilder".into()))?;
103        BACnetClient::start(self.config, transport).await
104    }
105}
106
107/// BIP-specific builder that constructs `BipTransport` from interface/port/broadcast fields.
108pub struct BipClientBuilder {
109    config: ClientConfig,
110}
111
112impl BipClientBuilder {
113    /// Set the local interface IP.
114    pub fn interface(mut self, ip: Ipv4Addr) -> Self {
115        self.config.interface = ip;
116        self
117    }
118
119    /// Set the UDP port (0 for ephemeral).
120    pub fn port(mut self, port: u16) -> Self {
121        self.config.port = port;
122        self
123    }
124
125    /// Set the directed broadcast address.
126    pub fn broadcast_address(mut self, addr: Ipv4Addr) -> Self {
127        self.config.broadcast_address = addr;
128        self
129    }
130
131    /// Set APDU timeout in milliseconds.
132    pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
133        self.config.apdu_timeout_ms = ms;
134        self
135    }
136
137    /// Set the maximum APDU length this client accepts.
138    pub fn max_apdu_length(mut self, len: u16) -> Self {
139        self.config.max_apdu_length = len;
140        self
141    }
142
143    /// Build and start the client, constructing a BipTransport from the config.
144    pub async fn build(self) -> Result<BACnetClient<BipTransport>, Error> {
145        let transport = BipTransport::new(
146            self.config.interface,
147            self.config.port,
148            self.config.broadcast_address,
149        );
150        BACnetClient::start(self.config, transport).await
151    }
152}
153
154/// In-progress segmented receive state.
155struct SegmentedReceiveState {
156    receiver: SegmentReceiver,
157    /// Next expected sequence number (for gap detection).
158    expected_next_seq: u8,
159    /// Timestamp of last received segment (for reaping stale sessions).
160    last_activity: Instant,
161}
162
163/// Timeout for idle segmented reassembly sessions (Clause 9.1.6).
164const SEG_RECEIVER_TIMEOUT: Duration = Duration::from_secs(4);
165
166/// Key for tracking in-progress segmented receives: (source_mac, invoke_id).
167type SegKey = (MacAddr, u8);
168
169/// BACnet client with low-level and high-level request APIs.
170pub struct BACnetClient<T: TransportPort> {
171    config: ClientConfig,
172    network: Arc<NetworkLayer<T>>,
173    tsm: Arc<Mutex<Tsm>>,
174    device_table: Arc<Mutex<DeviceTable>>,
175    cov_tx: broadcast::Sender<COVNotificationRequest>,
176    dispatch_task: Option<JoinHandle<()>>,
177    seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
178    local_mac: MacAddr,
179}
180
181impl BACnetClient<BipTransport> {
182    /// Create a BIP-specific builder with interface/port/broadcast fields.
183    pub fn bip_builder() -> BipClientBuilder {
184        BipClientBuilder {
185            config: ClientConfig::default(),
186        }
187    }
188
189    /// Create a BIP-specific builder (alias for backward compatibility).
190    pub fn builder() -> BipClientBuilder {
191        Self::bip_builder()
192    }
193
194    /// Read the Broadcast Distribution Table from a BBMD.
195    pub async fn read_bdt(
196        &self,
197        target: &[u8],
198    ) -> Result<Vec<bacnet_transport::bbmd::BdtEntry>, Error> {
199        self.network.transport().read_bdt(target).await
200    }
201
202    /// Write the Broadcast Distribution Table to a BBMD.
203    pub async fn write_bdt(
204        &self,
205        target: &[u8],
206        entries: &[bacnet_transport::bbmd::BdtEntry],
207    ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
208        self.network.transport().write_bdt(target, entries).await
209    }
210
211    /// Read the Foreign Device Table from a BBMD.
212    pub async fn read_fdt(
213        &self,
214        target: &[u8],
215    ) -> Result<Vec<bacnet_transport::bbmd::FdtEntryWire>, Error> {
216        self.network.transport().read_fdt(target).await
217    }
218
219    /// Delete a Foreign Device Table entry on a BBMD.
220    pub async fn delete_fdt_entry(
221        &self,
222        target: &[u8],
223        ip: [u8; 4],
224        port: u16,
225    ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
226        self.network
227            .transport()
228            .delete_fdt_entry(target, ip, port)
229            .await
230    }
231
232    /// Register as a foreign device with a BBMD and return the result code.
233    pub async fn register_foreign_device_bvlc(
234        &self,
235        target: &[u8],
236        ttl: u16,
237    ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
238        self.network
239            .transport()
240            .register_foreign_device(target, ttl)
241            .await
242    }
243}
244
245impl BACnetClient<Bip6Transport> {
246    /// Create a BIP6-specific builder for BACnet/IPv6 transport.
247    pub fn bip6_builder() -> Bip6ClientBuilder {
248        Bip6ClientBuilder {
249            config: ClientConfig::default(),
250            interface: Ipv6Addr::UNSPECIFIED,
251            device_instance: None,
252        }
253    }
254}
255
256/// BIP6-specific builder that constructs `Bip6Transport` from IPv6 interface/port/device-instance.
257pub struct Bip6ClientBuilder {
258    config: ClientConfig,
259    interface: Ipv6Addr,
260    device_instance: Option<u32>,
261}
262
263impl Bip6ClientBuilder {
264    /// Set the local IPv6 interface address.
265    pub fn interface(mut self, ip: Ipv6Addr) -> Self {
266        self.interface = ip;
267        self
268    }
269
270    /// Set the UDP port (0 for ephemeral).
271    pub fn port(mut self, port: u16) -> Self {
272        self.config.port = port;
273        self
274    }
275
276    /// Set the device instance for VMAC derivation (Annex U.5).
277    pub fn device_instance(mut self, instance: u32) -> Self {
278        self.device_instance = Some(instance);
279        self
280    }
281
282    /// Set APDU timeout in milliseconds.
283    pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
284        self.config.apdu_timeout_ms = ms;
285        self
286    }
287
288    /// Set the maximum APDU length this client accepts.
289    pub fn max_apdu_length(mut self, len: u16) -> Self {
290        self.config.max_apdu_length = len;
291        self
292    }
293
294    /// Build and start the client, constructing a Bip6Transport from the config.
295    pub async fn build(self) -> Result<BACnetClient<Bip6Transport>, Error> {
296        let transport = Bip6Transport::new(self.interface, self.config.port, self.device_instance);
297        BACnetClient::start(self.config, transport).await
298    }
299}
300
301#[cfg(feature = "sc-tls")]
302impl BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>> {
303    /// Create an SC-specific builder that connects to a BACnet/SC hub.
304    pub fn sc_builder() -> ScClientBuilder {
305        ScClientBuilder {
306            config: ClientConfig::default(),
307            hub_url: String::new(),
308            tls_config: None,
309            vmac: [0; 6],
310            heartbeat_interval_ms: 30_000,
311            heartbeat_timeout_ms: 60_000,
312            reconnect: None,
313        }
314    }
315}
316
317/// SC-specific client builder.
318///
319/// Created by [`BACnetClient::sc_builder()`].  Requires the `sc-tls` feature.
320#[cfg(feature = "sc-tls")]
321pub struct ScClientBuilder {
322    config: ClientConfig,
323    hub_url: String,
324    tls_config: Option<std::sync::Arc<tokio_rustls::rustls::ClientConfig>>,
325    vmac: bacnet_transport::sc_frame::Vmac,
326    heartbeat_interval_ms: u64,
327    heartbeat_timeout_ms: u64,
328    reconnect: Option<bacnet_transport::sc::ScReconnectConfig>,
329}
330
331#[cfg(feature = "sc-tls")]
332impl ScClientBuilder {
333    /// Set the hub WebSocket URL (e.g. `wss://hub.example.com/bacnet`).
334    pub fn hub_url(mut self, url: &str) -> Self {
335        self.hub_url = url.to_string();
336        self
337    }
338
339    /// Set the TLS client configuration.
340    pub fn tls_config(
341        mut self,
342        config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
343    ) -> Self {
344        self.tls_config = Some(config);
345        self
346    }
347
348    /// Set the local VMAC address.
349    pub fn vmac(mut self, vmac: [u8; 6]) -> Self {
350        self.vmac = vmac;
351        self
352    }
353
354    /// Set the APDU timeout in milliseconds.
355    pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
356        self.config.apdu_timeout_ms = ms;
357        self
358    }
359
360    /// Set the heartbeat interval in milliseconds (default 30 000).
361    pub fn heartbeat_interval_ms(mut self, ms: u64) -> Self {
362        self.heartbeat_interval_ms = ms;
363        self
364    }
365
366    /// Set the heartbeat timeout in milliseconds (default 60 000).
367    pub fn heartbeat_timeout_ms(mut self, ms: u64) -> Self {
368        self.heartbeat_timeout_ms = ms;
369        self
370    }
371
372    /// Enable automatic reconnection with the given configuration.
373    pub fn reconnect(mut self, config: bacnet_transport::sc::ScReconnectConfig) -> Self {
374        self.reconnect = Some(config);
375        self
376    }
377
378    /// Connect to the hub and start the client.
379    pub async fn build(
380        self,
381    ) -> Result<
382        BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>>,
383        Error,
384    > {
385        let tls_config = self
386            .tls_config
387            .ok_or_else(|| Error::Encoding("SC client builder: tls_config is required".into()))?;
388
389        let ws = bacnet_transport::sc_tls::TlsWebSocket::connect(&self.hub_url, tls_config).await?;
390
391        let mut transport = bacnet_transport::sc::ScTransport::new(ws, self.vmac)
392            .with_heartbeat_interval_ms(self.heartbeat_interval_ms)
393            .with_heartbeat_timeout_ms(self.heartbeat_timeout_ms);
394        if let Some(rc) = self.reconnect {
395            #[allow(deprecated)]
396            {
397                transport = transport.with_reconnect(rc);
398            }
399        }
400
401        BACnetClient::start(self.config, transport).await
402    }
403}
404
405impl<T: TransportPort + 'static> BACnetClient<T> {
406    /// Create a generic builder that accepts a pre-built transport.
407    pub fn generic_builder() -> ClientBuilder<T> {
408        ClientBuilder {
409            config: ClientConfig::default(),
410            transport: None,
411        }
412    }
413
414    /// Start the client: bind transport, start network layer, spawn dispatch.
415    pub async fn start(mut config: ClientConfig, transport: T) -> Result<Self, Error> {
416        // Clamp max_apdu_length to the transport's physical limit.
417        let transport_max = transport.max_apdu_length();
418        config.max_apdu_length = config.max_apdu_length.min(transport_max);
419
420        let mut network = NetworkLayer::new(transport);
421        let mut apdu_rx = network.start().await?;
422        let local_mac = MacAddr::from_slice(network.local_mac());
423
424        let network = Arc::new(network);
425
426        let tsm_config = TsmConfig {
427            apdu_timeout_ms: config.apdu_timeout_ms,
428            apdu_retries: config.apdu_retries,
429        };
430        let tsm = Arc::new(Mutex::new(Tsm::new(tsm_config)));
431        let tsm_dispatch = Arc::clone(&tsm);
432        let device_table = Arc::new(Mutex::new(DeviceTable::new()));
433        let device_table_dispatch = Arc::clone(&device_table);
434        let network_dispatch = Arc::clone(&network);
435        let (cov_tx, _) = broadcast::channel::<COVNotificationRequest>(64);
436        let cov_tx_dispatch = cov_tx.clone();
437        let seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>> =
438            Arc::new(Mutex::new(HashMap::new()));
439        let seg_ack_senders_dispatch = Arc::clone(&seg_ack_senders);
440
441        // Spawn APDU dispatch task
442        let dispatch_task = tokio::spawn(async move {
443            // Segmented receive state is task-local — no external sharing needed.
444            let mut seg_state: HashMap<SegKey, SegmentedReceiveState> = HashMap::new();
445
446            while let Some(received) = apdu_rx.recv().await {
447                // Reap timed-out segmented reassembly sessions (Clause 9.1.6).
448                let now = Instant::now();
449                seg_state.retain(|_key, state| {
450                    now.duration_since(state.last_activity) < SEG_RECEIVER_TIMEOUT
451                });
452
453                match apdu::decode_apdu(received.apdu.clone()) {
454                    Ok(decoded) => {
455                        Self::dispatch_apdu(
456                            &tsm_dispatch,
457                            &device_table_dispatch,
458                            &network_dispatch,
459                            &cov_tx_dispatch,
460                            &mut seg_state,
461                            &seg_ack_senders_dispatch,
462                            &received.source_mac,
463                            decoded,
464                        )
465                        .await;
466                    }
467                    Err(e) => {
468                        warn!(error = %e, "Failed to decode received APDU");
469                    }
470                }
471            }
472        });
473
474        Ok(Self {
475            config,
476            network,
477            tsm,
478            device_table,
479            cov_tx,
480            dispatch_task: Some(dispatch_task),
481            seg_ack_senders,
482            local_mac,
483        })
484    }
485
486    /// Dispatch a received APDU to the appropriate TSM handler.
487    #[allow(clippy::too_many_arguments)]
488    async fn dispatch_apdu(
489        tsm: &Arc<Mutex<Tsm>>,
490        device_table: &Arc<Mutex<DeviceTable>>,
491        network: &Arc<NetworkLayer<T>>,
492        cov_tx: &broadcast::Sender<COVNotificationRequest>,
493        seg_state: &mut HashMap<SegKey, SegmentedReceiveState>,
494        seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
495        source_mac: &[u8],
496        apdu: Apdu,
497    ) {
498        match apdu {
499            Apdu::SimpleAck(ack) => {
500                debug!(invoke_id = ack.invoke_id, "Received SimpleAck");
501                let mut tsm = tsm.lock().await;
502                tsm.complete_transaction(source_mac, ack.invoke_id, TsmResponse::SimpleAck);
503            }
504            Apdu::ComplexAck(ack) => {
505                if ack.segmented {
506                    Self::handle_segmented_complex_ack(tsm, network, seg_state, source_mac, ack)
507                        .await;
508                } else {
509                    debug!(invoke_id = ack.invoke_id, "Received ComplexAck");
510                    let mut tsm = tsm.lock().await;
511                    tsm.complete_transaction(
512                        source_mac,
513                        ack.invoke_id,
514                        TsmResponse::ComplexAck {
515                            service_data: ack.service_ack,
516                        },
517                    );
518                }
519            }
520            Apdu::Error(err) => {
521                debug!(invoke_id = err.invoke_id, "Received Error PDU");
522                let mut tsm = tsm.lock().await;
523                tsm.complete_transaction(
524                    source_mac,
525                    err.invoke_id,
526                    TsmResponse::Error {
527                        class: err.error_class.to_raw() as u32,
528                        code: err.error_code.to_raw() as u32,
529                    },
530                );
531            }
532            Apdu::Reject(rej) => {
533                debug!(invoke_id = rej.invoke_id, "Received Reject PDU");
534                let mut tsm = tsm.lock().await;
535                tsm.complete_transaction(
536                    source_mac,
537                    rej.invoke_id,
538                    TsmResponse::Reject {
539                        reason: rej.reject_reason.to_raw(),
540                    },
541                );
542            }
543            Apdu::Abort(abt) => {
544                debug!(invoke_id = abt.invoke_id, "Received Abort PDU");
545                let mut tsm = tsm.lock().await;
546                tsm.complete_transaction(
547                    source_mac,
548                    abt.invoke_id,
549                    TsmResponse::Abort {
550                        reason: abt.abort_reason.to_raw(),
551                    },
552                );
553            }
554            Apdu::ConfirmedRequest(req) => {
555                // Handle ConfirmedCOVNotification from a server
556                if req.service_choice == ConfirmedServiceChoice::CONFIRMED_COV_NOTIFICATION {
557                    match COVNotificationRequest::decode(&req.service_request) {
558                        Ok(notification) => {
559                            debug!(
560                                object = ?notification.monitored_object_identifier,
561                                "Received ConfirmedCOVNotification"
562                            );
563                            let _ = cov_tx.send(notification);
564
565                            // Send SimpleAck back
566                            let ack = Apdu::SimpleAck(SimpleAck {
567                                invoke_id: req.invoke_id,
568                                service_choice: req.service_choice,
569                            });
570                            let mut buf = BytesMut::with_capacity(4);
571                            encode_apdu(&mut buf, &ack);
572                            if let Err(e) = network
573                                .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
574                                .await
575                            {
576                                warn!(error = %e, "Failed to send SimpleAck for COV notification");
577                            }
578                        }
579                        Err(e) => {
580                            warn!(error = %e, "Failed to decode ConfirmedCOVNotification");
581                        }
582                    }
583                } else {
584                    debug!(
585                        service = req.service_choice.to_raw(),
586                        "Ignoring ConfirmedRequest (client mode)"
587                    );
588                }
589            }
590            Apdu::UnconfirmedRequest(req) => {
591                if req.service_choice == UnconfirmedServiceChoice::I_AM {
592                    match bacnet_services::who_is::IAmRequest::decode(&req.service_request) {
593                        Ok(i_am) => {
594                            debug!(
595                                device = i_am.object_identifier.instance_number(),
596                                vendor = i_am.vendor_id,
597                                "Received IAm"
598                            );
599                            let device = DiscoveredDevice {
600                                object_identifier: i_am.object_identifier,
601                                mac_address: MacAddr::from_slice(source_mac),
602                                max_apdu_length: i_am.max_apdu_length,
603                                segmentation_supported: i_am.segmentation_supported,
604                                max_segments_accepted: None,
605                                vendor_id: i_am.vendor_id,
606                                last_seen: std::time::Instant::now(),
607                            };
608                            device_table.lock().await.upsert(device);
609                        }
610                        Err(e) => {
611                            warn!(error = %e, "Failed to decode IAm");
612                        }
613                    }
614                } else if req.service_choice
615                    == UnconfirmedServiceChoice::UNCONFIRMED_COV_NOTIFICATION
616                {
617                    match COVNotificationRequest::decode(&req.service_request) {
618                        Ok(notification) => {
619                            debug!(
620                                object = ?notification.monitored_object_identifier,
621                                "Received UnconfirmedCOVNotification"
622                            );
623                            let _ = cov_tx.send(notification);
624                        }
625                        Err(e) => {
626                            warn!(error = %e, "Failed to decode UnconfirmedCOVNotification");
627                        }
628                    }
629                } else {
630                    debug!(
631                        service = req.service_choice.to_raw(),
632                        "Ignoring unconfirmed service in client dispatch"
633                    );
634                }
635            }
636            Apdu::SegmentAck(sa) => {
637                // Forward to the segmented send in progress for this transaction.
638                let key = (MacAddr::from_slice(source_mac), sa.invoke_id);
639                let senders = seg_ack_senders.lock().await;
640                if let Some(tx) = senders.get(&key) {
641                    let _ = tx.try_send(sa);
642                } else {
643                    debug!(
644                        invoke_id = sa.invoke_id,
645                        "Ignoring SegmentAck for unknown transaction"
646                    );
647                }
648            }
649        }
650    }
651
652    /// Handle a segmented ComplexAck: accumulate segments, send SegmentAck,
653    /// reassemble and complete TSM transaction when all segments received.
654    async fn handle_segmented_complex_ack(
655        tsm: &Arc<Mutex<Tsm>>,
656        network: &Arc<NetworkLayer<T>>,
657        seg_state: &mut HashMap<SegKey, SegmentedReceiveState>,
658        source_mac: &[u8],
659        ack: bacnet_encoding::apdu::ComplexAck,
660    ) {
661        let seq = ack.sequence_number.unwrap_or(0);
662        let key = (MacAddr::from_slice(source_mac), ack.invoke_id);
663
664        debug!(
665            invoke_id = ack.invoke_id,
666            seq = seq,
667            more = ack.more_follows,
668            "Received segmented ComplexAck"
669        );
670
671        // Cap concurrent segmented sessions to prevent resource exhaustion
672        const MAX_CONCURRENT_SEG_SESSIONS: usize = 64;
673        if !seg_state.contains_key(&key) && seg_state.len() >= MAX_CONCURRENT_SEG_SESSIONS {
674            warn!(
675                invoke_id = ack.invoke_id,
676                sessions = seg_state.len(),
677                "Max concurrent segmented sessions reached, dropping segment"
678            );
679            return;
680        }
681
682        // Get or create the receive state for this transaction
683        let state = seg_state
684            .entry(key.clone())
685            .or_insert_with(|| SegmentedReceiveState {
686                receiver: SegmentReceiver::new(),
687                expected_next_seq: 0,
688                last_activity: Instant::now(),
689            });
690
691        // Update activity timestamp for this session.
692        state.last_activity = Instant::now();
693
694        // Gap detection: if the sequence number doesn't match expected, send
695        // a negative SegmentAck requesting retransmission from the last
696        // contiguous sequence number.
697        if seq != state.expected_next_seq {
698            warn!(
699                invoke_id = ack.invoke_id,
700                expected = state.expected_next_seq,
701                received = seq,
702                "Segment gap detected, sending negative SegmentAck"
703            );
704            let neg_ack = Apdu::SegmentAck(SegmentAckPdu {
705                negative_ack: true,
706                sent_by_server: false,
707                invoke_id: ack.invoke_id,
708                sequence_number: state.expected_next_seq,
709                actual_window_size: ack.proposed_window_size.unwrap_or(1),
710            });
711            let mut buf = BytesMut::with_capacity(4);
712            encode_apdu(&mut buf, &neg_ack);
713            if let Err(e) = network
714                .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
715                .await
716            {
717                warn!(error = %e, "Failed to send negative SegmentAck");
718            }
719            return;
720        }
721
722        // Store this segment and advance expected sequence
723        if let Err(e) = state.receiver.receive(seq, ack.service_ack) {
724            warn!(error = %e, "Rejecting oversized segment");
725            return;
726        }
727        state.expected_next_seq = seq.wrapping_add(1);
728
729        // Send SegmentAck to acknowledge receipt
730        let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
731            negative_ack: false,
732            sent_by_server: false,
733            invoke_id: ack.invoke_id,
734            sequence_number: seq,
735            actual_window_size: ack.proposed_window_size.unwrap_or(1),
736        });
737        let mut buf = BytesMut::with_capacity(4);
738        encode_apdu(&mut buf, &seg_ack);
739        if let Err(e) = network
740            .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
741            .await
742        {
743            warn!(error = %e, "Failed to send SegmentAck");
744        }
745
746        // If this is the last segment, reassemble and complete the transaction.
747        // Gap detection: reassemble() validates that segments 0..total are all
748        // present; any missing sequence number produces an Err.
749        if !ack.more_follows {
750            let state = seg_state.remove(&key).unwrap();
751            let total = state.receiver.received_count();
752            match state.receiver.reassemble(total) {
753                Ok(service_data) => {
754                    debug!(
755                        invoke_id = ack.invoke_id,
756                        segments = total,
757                        bytes = service_data.len(),
758                        "Reassembled segmented ComplexAck"
759                    );
760                    let mut tsm = tsm.lock().await;
761                    tsm.complete_transaction(
762                        source_mac,
763                        ack.invoke_id,
764                        TsmResponse::ComplexAck {
765                            service_data: Bytes::from(service_data),
766                        },
767                    );
768                }
769                Err(e) => {
770                    warn!(error = %e, "Failed to reassemble segmented ComplexAck");
771                }
772            }
773        }
774    }
775
776    /// Get the client's local MAC address.
777    pub fn local_mac(&self) -> &[u8] {
778        &self.local_mac
779    }
780
781    // -----------------------------------------------------------------------
782    // Low-level API
783    // -----------------------------------------------------------------------
784
785    /// Send a confirmed request and wait for the response.
786    ///
787    /// Returns the service response data (empty `Vec` for SimpleAck).
788    /// Returns an error on timeout, protocol error, reject, or abort.
789    ///
790    /// Automatically uses segmented transfer when the payload exceeds the
791    /// remote device's max APDU length.
792    pub async fn confirmed_request(
793        &self,
794        destination_mac: &[u8],
795        service_choice: ConfirmedServiceChoice,
796        service_data: &[u8],
797    ) -> Result<Bytes, Error> {
798        // Check if segmentation is needed.
799        // Non-segmented ConfirmedRequest overhead: 4 bytes (type+flags, max-seg/apdu, invoke, service).
800        let unsegmented_apdu_size = 4 + service_data.len();
801        let (remote_max_apdu, remote_max_segments) = {
802            let dt = self.device_table.lock().await;
803            let device = dt.get_by_mac(destination_mac);
804            let max_apdu = device
805                .map(|d| d.max_apdu_length as u16)
806                .unwrap_or(self.config.max_apdu_length);
807            let max_seg = device.and_then(|d| d.max_segments_accepted);
808            (max_apdu, max_seg)
809        };
810        if unsegmented_apdu_size > remote_max_apdu as usize {
811            return self
812                .segmented_confirmed_request(
813                    destination_mac,
814                    service_choice,
815                    service_data,
816                    remote_max_apdu,
817                    remote_max_segments,
818                )
819                .await;
820        }
821
822        // Allocate invoke ID and register transaction
823        let (invoke_id, rx) = {
824            let mut tsm = self.tsm.lock().await;
825            let invoke_id = tsm.allocate_invoke_id(destination_mac).ok_or_else(|| {
826                Error::Encoding("all invoke IDs exhausted for destination".into())
827            })?;
828            let rx = tsm.register_transaction(MacAddr::from_slice(destination_mac), invoke_id);
829            (invoke_id, rx)
830        };
831
832        // Build ConfirmedRequest APDU
833        let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
834            segmented: false,
835            more_follows: false,
836            segmented_response_accepted: self.config.segmented_response_accepted,
837            max_segments: self.config.max_segments,
838            max_apdu_length: self.config.max_apdu_length,
839            invoke_id,
840            sequence_number: None,
841            proposed_window_size: None,
842            service_choice,
843            service_request: Bytes::copy_from_slice(service_data),
844        });
845
846        let mut buf = BytesMut::with_capacity(6 + service_data.len());
847        encode_apdu(&mut buf, &pdu);
848
849        // Retry loop per Clause 5.4.2: retransmit on timeout up to apdu_retries times.
850        // The invoke_id stays the same across retries. The TSM transaction is NOT
851        // cancelled between retries — only on final timeout or non-timeout error.
852        let timeout_duration = Duration::from_millis(self.config.apdu_timeout_ms);
853        let max_retries = self.config.apdu_retries;
854        let mut attempts: u8 = 0;
855        let mut rx = rx;
856
857        loop {
858            if let Err(e) = self
859                .network
860                .send_apdu(&buf, destination_mac, true, NetworkPriority::NORMAL)
861                .await
862            {
863                // Clean up the invoke ID on send failure to prevent pool exhaustion
864                let mut tsm = self.tsm.lock().await;
865                tsm.cancel_transaction(destination_mac, invoke_id);
866                return Err(e);
867            }
868
869            match timeout(timeout_duration, &mut rx).await {
870                Ok(Ok(response)) => {
871                    // Response received — convert TsmResponse to Result
872                    return match response {
873                        TsmResponse::SimpleAck => Ok(Bytes::new()),
874                        TsmResponse::ComplexAck { service_data } => Ok(service_data),
875                        TsmResponse::Error { class, code } => Err(Error::Protocol { class, code }),
876                        TsmResponse::Reject { reason } => Err(Error::Reject { reason }),
877                        TsmResponse::Abort { reason } => Err(Error::Abort { reason }),
878                    };
879                }
880                Ok(Err(_)) => {
881                    // Channel closed — TSM transaction was cancelled externally
882                    return Err(Error::Encoding("TSM response channel closed".into()));
883                }
884                Err(_timeout) => {
885                    attempts += 1;
886                    if attempts > max_retries {
887                        // Final timeout — cancel TSM transaction and return error
888                        let mut tsm = self.tsm.lock().await;
889                        tsm.cancel_transaction(destination_mac, invoke_id);
890                        return Err(Error::Timeout(timeout_duration));
891                    }
892                    debug!(
893                        invoke_id,
894                        attempt = attempts,
895                        max_retries,
896                        "APDU timeout, retrying confirmed request"
897                    );
898                }
899            }
900        }
901    }
902
903    /// Send a confirmed request using segmented transfer.
904    ///
905    /// Splits the service data into segments, sends them with windowed flow
906    /// control (SegmentAck from server), then waits for the final response.
907    async fn segmented_confirmed_request(
908        &self,
909        destination_mac: &[u8],
910        service_choice: ConfirmedServiceChoice,
911        service_data: &[u8],
912        remote_max_apdu: u16,
913        remote_max_segments: Option<u32>,
914    ) -> Result<Bytes, Error> {
915        let max_seg_size = max_segment_payload(remote_max_apdu, SegmentedPduType::ConfirmedRequest);
916        let segments = split_payload(service_data, max_seg_size);
917        let total_segments = segments.len();
918
919        if total_segments > 255 {
920            return Err(Error::Segmentation(format!(
921                "payload requires {} segments, maximum is 255",
922                total_segments
923            )));
924        }
925
926        if let Some(max_seg) = remote_max_segments {
927            if total_segments > max_seg as usize {
928                return Err(Error::Segmentation(format!(
929                    "request requires {} segments but remote accepts at most {}",
930                    total_segments, max_seg
931                )));
932            }
933        }
934
935        debug!(
936            total_segments,
937            max_seg_size,
938            service_data_len = service_data.len(),
939            "Starting segmented confirmed request"
940        );
941
942        // Allocate invoke ID and register TSM transaction (for final response).
943        let (invoke_id, rx) = {
944            let mut tsm = self.tsm.lock().await;
945            let invoke_id = tsm.allocate_invoke_id(destination_mac).ok_or_else(|| {
946                Error::Encoding("all invoke IDs exhausted for destination".into())
947            })?;
948            let rx = tsm.register_transaction(MacAddr::from_slice(destination_mac), invoke_id);
949            (invoke_id, rx)
950        };
951
952        // Register a channel for receiving SegmentAck PDUs during the send.
953        let (seg_ack_tx, mut seg_ack_rx) = mpsc::channel(16);
954        {
955            let key = (MacAddr::from_slice(destination_mac), invoke_id);
956            self.seg_ack_senders.lock().await.insert(key, seg_ack_tx);
957        }
958
959        let timeout_duration = Duration::from_millis(self.config.apdu_timeout_ms);
960        let max_ack_retries = self.config.apdu_retries;
961        let mut window_size = self.config.proposed_window_size.max(1) as usize;
962        let mut next_seq: usize = 0;
963        let mut neg_ack_retries: u32 = 0;
964        const MAX_NEG_ACK_RETRIES: u32 = 10;
965
966        // Send segments in windows, waiting for SegmentAck after each window.
967        let result = async {
968            while next_seq < total_segments {
969                let window_end = (next_seq + window_size).min(total_segments);
970
971                for (seq, segment_data) in segments[next_seq..window_end]
972                    .iter()
973                    .enumerate()
974                    .map(|(i, s)| (next_seq + i, s))
975                {
976                    let is_last = seq == total_segments - 1;
977                    let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
978                        segmented: true,
979                        more_follows: !is_last,
980                        segmented_response_accepted: self.config.segmented_response_accepted,
981                        max_segments: self.config.max_segments,
982                        max_apdu_length: self.config.max_apdu_length,
983                        invoke_id,
984                        sequence_number: Some(seq as u8),
985                        proposed_window_size: Some(self.config.proposed_window_size.max(1)),
986                        service_choice,
987                        service_request: segment_data.clone(),
988                    });
989
990                    let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
991                    encode_apdu(&mut buf, &pdu);
992
993                    self.network
994                        .send_apdu(&buf, destination_mac, true, NetworkPriority::NORMAL)
995                        .await?;
996
997                    debug!(seq, is_last, "Sent segment");
998                }
999
1000                // Wait for SegmentAck with retry logic matching the unsegmented path.
1001                let ack = {
1002                    let mut ack_retries: u8 = 0;
1003                    loop {
1004                        match timeout(timeout_duration, seg_ack_rx.recv()).await {
1005                            Ok(Some(ack)) => break ack,
1006                            Ok(None) => {
1007                                return Err(Error::Encoding("SegmentAck channel closed".into()));
1008                            }
1009                            Err(_timeout) => {
1010                                ack_retries += 1;
1011                                if ack_retries > max_ack_retries {
1012                                    return Err(Error::Timeout(timeout_duration));
1013                                }
1014                                warn!(
1015                                    attempt = ack_retries,
1016                                    "Retransmitting segmented request window"
1017                                );
1018                                // Retransmit the current window.
1019                                for (seq, segment_data) in segments[next_seq..window_end]
1020                                    .iter()
1021                                    .enumerate()
1022                                    .map(|(i, s)| (next_seq + i, s))
1023                                {
1024                                    let is_last = seq == total_segments - 1;
1025                                    let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
1026                                        segmented: true,
1027                                        more_follows: !is_last,
1028                                        segmented_response_accepted: self
1029                                            .config
1030                                            .segmented_response_accepted,
1031                                        max_segments: self.config.max_segments,
1032                                        max_apdu_length: self.config.max_apdu_length,
1033                                        invoke_id,
1034                                        sequence_number: Some(seq as u8),
1035                                        proposed_window_size: Some(
1036                                            self.config.proposed_window_size.max(1),
1037                                        ),
1038                                        service_choice,
1039                                        service_request: segment_data.clone(),
1040                                    });
1041
1042                                    let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
1043                                    encode_apdu(&mut buf, &pdu);
1044
1045                                    self.network
1046                                        .send_apdu(
1047                                            &buf,
1048                                            destination_mac,
1049                                            true,
1050                                            NetworkPriority::NORMAL,
1051                                        )
1052                                        .await?;
1053                                }
1054                            }
1055                        }
1056                    }
1057                };
1058
1059                debug!(
1060                    seq = ack.sequence_number,
1061                    negative = ack.negative_ack,
1062                    window = ack.actual_window_size,
1063                    "Received SegmentAck"
1064                );
1065
1066                // Update window size from server's response.
1067                window_size = ack.actual_window_size.max(1) as usize;
1068
1069                if ack.negative_ack {
1070                    neg_ack_retries += 1;
1071                    if neg_ack_retries > MAX_NEG_ACK_RETRIES {
1072                        return Err(Error::Segmentation(
1073                            "too many negative SegmentAck retransmissions".into(),
1074                        ));
1075                    }
1076                    // Server is requesting retransmission from this sequence.
1077                    next_seq = ack.sequence_number as usize;
1078                } else {
1079                    neg_ack_retries = 0;
1080                    // Advance past the acknowledged segment.
1081                    next_seq = ack.sequence_number as usize + 1;
1082                }
1083            }
1084
1085            // All segments sent and acknowledged. Wait for final response via TSM.
1086            timeout(timeout_duration, rx)
1087                .await
1088                .map_err(|_| Error::Timeout(timeout_duration))?
1089                .map_err(|_| Error::Encoding("TSM response channel closed".into()))
1090        }
1091        .await;
1092
1093        // Clean up seg_ack channel regardless of outcome.
1094        {
1095            let key = (MacAddr::from_slice(destination_mac), invoke_id);
1096            self.seg_ack_senders.lock().await.remove(&key);
1097        }
1098
1099        // On error, cancel the TSM transaction.
1100        let response = match result {
1101            Ok(response) => response,
1102            Err(e) => {
1103                let mut tsm = self.tsm.lock().await;
1104                tsm.cancel_transaction(destination_mac, invoke_id);
1105                return Err(e);
1106            }
1107        };
1108
1109        match response {
1110            TsmResponse::SimpleAck => Ok(Bytes::new()),
1111            TsmResponse::ComplexAck { service_data } => Ok(service_data),
1112            TsmResponse::Error { class, code } => Err(Error::Protocol { class, code }),
1113            TsmResponse::Reject { reason } => Err(Error::Reject { reason }),
1114            TsmResponse::Abort { reason } => Err(Error::Abort { reason }),
1115        }
1116    }
1117
1118    /// Send an unconfirmed request (fire-and-forget) to a specific destination.
1119    pub async fn unconfirmed_request(
1120        &self,
1121        destination_mac: &[u8],
1122        service_choice: UnconfirmedServiceChoice,
1123        service_data: &[u8],
1124    ) -> Result<(), Error> {
1125        let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1126            service_choice,
1127            service_request: Bytes::copy_from_slice(service_data),
1128        });
1129
1130        let mut buf = BytesMut::with_capacity(2 + service_data.len());
1131        encode_apdu(&mut buf, &pdu);
1132
1133        self.network
1134            .send_apdu(&buf, destination_mac, false, NetworkPriority::NORMAL)
1135            .await
1136    }
1137
1138    /// Broadcast an unconfirmed request on the local network.
1139    pub async fn broadcast_unconfirmed(
1140        &self,
1141        service_choice: UnconfirmedServiceChoice,
1142        service_data: &[u8],
1143    ) -> Result<(), Error> {
1144        let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1145            service_choice,
1146            service_request: Bytes::copy_from_slice(service_data),
1147        });
1148
1149        let mut buf = BytesMut::with_capacity(2 + service_data.len());
1150        encode_apdu(&mut buf, &pdu);
1151
1152        self.network
1153            .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1154            .await
1155    }
1156
1157    /// Broadcast an unconfirmed request globally (DNET=0xFFFF).
1158    ///
1159    /// Unlike `broadcast_unconfirmed()` which only reaches the local subnet,
1160    /// this sends a global broadcast that routers will forward to all networks.
1161    pub async fn broadcast_global_unconfirmed(
1162        &self,
1163        service_choice: UnconfirmedServiceChoice,
1164        service_data: &[u8],
1165    ) -> Result<(), Error> {
1166        let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1167            service_choice,
1168            service_request: Bytes::copy_from_slice(service_data),
1169        });
1170
1171        let mut buf = BytesMut::with_capacity(2 + service_data.len());
1172        encode_apdu(&mut buf, &pdu);
1173
1174        self.network
1175            .broadcast_global_apdu(&buf, false, NetworkPriority::NORMAL)
1176            .await
1177    }
1178
1179    // -----------------------------------------------------------------------
1180    // High-level API
1181    // -----------------------------------------------------------------------
1182
1183    /// Read a property from a remote device.
1184    pub async fn read_property(
1185        &self,
1186        destination_mac: &[u8],
1187        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1188        property_identifier: bacnet_types::enums::PropertyIdentifier,
1189        property_array_index: Option<u32>,
1190    ) -> Result<bacnet_services::read_property::ReadPropertyACK, Error> {
1191        use bacnet_services::read_property::ReadPropertyRequest;
1192
1193        let request = ReadPropertyRequest {
1194            object_identifier,
1195            property_identifier,
1196            property_array_index,
1197        };
1198        let mut buf = BytesMut::new();
1199        request.encode(&mut buf);
1200
1201        let response_data = self
1202            .confirmed_request(destination_mac, ConfirmedServiceChoice::READ_PROPERTY, &buf)
1203            .await?;
1204
1205        bacnet_services::read_property::ReadPropertyACK::decode(&response_data)
1206    }
1207
1208    /// Write a property on a remote device.
1209    pub async fn write_property(
1210        &self,
1211        destination_mac: &[u8],
1212        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1213        property_identifier: bacnet_types::enums::PropertyIdentifier,
1214        property_array_index: Option<u32>,
1215        property_value: Vec<u8>,
1216        priority: Option<u8>,
1217    ) -> Result<(), Error> {
1218        use bacnet_services::write_property::WritePropertyRequest;
1219
1220        let request = WritePropertyRequest {
1221            object_identifier,
1222            property_identifier,
1223            property_array_index,
1224            property_value,
1225            priority,
1226        };
1227        let mut buf = BytesMut::new();
1228        request.encode(&mut buf);
1229
1230        let _ = self
1231            .confirmed_request(
1232                destination_mac,
1233                ConfirmedServiceChoice::WRITE_PROPERTY,
1234                &buf,
1235            )
1236            .await?;
1237
1238        Ok(())
1239    }
1240
1241    /// Read multiple properties from one or more objects on a remote device.
1242    pub async fn read_property_multiple(
1243        &self,
1244        destination_mac: &[u8],
1245        specs: Vec<bacnet_services::rpm::ReadAccessSpecification>,
1246    ) -> Result<bacnet_services::rpm::ReadPropertyMultipleACK, Error> {
1247        use bacnet_services::rpm::{ReadPropertyMultipleACK, ReadPropertyMultipleRequest};
1248
1249        let request = ReadPropertyMultipleRequest {
1250            list_of_read_access_specs: specs,
1251        };
1252        let mut buf = BytesMut::new();
1253        request.encode(&mut buf);
1254
1255        let response_data = self
1256            .confirmed_request(
1257                destination_mac,
1258                ConfirmedServiceChoice::READ_PROPERTY_MULTIPLE,
1259                &buf,
1260            )
1261            .await?;
1262
1263        ReadPropertyMultipleACK::decode(&response_data)
1264    }
1265
1266    /// Write multiple properties on one or more objects on a remote device.
1267    pub async fn write_property_multiple(
1268        &self,
1269        destination_mac: &[u8],
1270        specs: Vec<bacnet_services::wpm::WriteAccessSpecification>,
1271    ) -> Result<(), Error> {
1272        use bacnet_services::wpm::WritePropertyMultipleRequest;
1273
1274        let request = WritePropertyMultipleRequest {
1275            list_of_write_access_specs: specs,
1276        };
1277        let mut buf = BytesMut::new();
1278        request.encode(&mut buf);
1279
1280        let _ = self
1281            .confirmed_request(
1282                destination_mac,
1283                ConfirmedServiceChoice::WRITE_PROPERTY_MULTIPLE,
1284                &buf,
1285            )
1286            .await?;
1287
1288        Ok(())
1289    }
1290
1291    /// Send a WhoIs broadcast to discover devices.
1292    pub async fn who_is(
1293        &self,
1294        low_limit: Option<u32>,
1295        high_limit: Option<u32>,
1296    ) -> Result<(), Error> {
1297        use bacnet_services::who_is::WhoIsRequest;
1298
1299        let request = WhoIsRequest {
1300            low_limit,
1301            high_limit,
1302        };
1303        let mut buf = BytesMut::new();
1304        request.encode(&mut buf);
1305
1306        self.broadcast_global_unconfirmed(UnconfirmedServiceChoice::WHO_IS, &buf)
1307            .await
1308    }
1309
1310    /// Send a directed (unicast) WhoIs to a specific device.
1311    pub async fn who_is_directed(
1312        &self,
1313        destination_mac: &[u8],
1314        low_limit: Option<u32>,
1315        high_limit: Option<u32>,
1316    ) -> Result<(), Error> {
1317        use bacnet_services::who_is::WhoIsRequest;
1318
1319        let request = WhoIsRequest {
1320            low_limit,
1321            high_limit,
1322        };
1323        let mut buf = BytesMut::new();
1324        request.encode(&mut buf);
1325
1326        let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1327            service_choice: UnconfirmedServiceChoice::WHO_IS,
1328            service_request: Bytes::copy_from_slice(&buf),
1329        });
1330
1331        let mut apdu_buf = BytesMut::with_capacity(2 + buf.len());
1332        encode_apdu(&mut apdu_buf, &pdu);
1333
1334        self.network
1335            .send_apdu(&apdu_buf, destination_mac, false, NetworkPriority::NORMAL)
1336            .await
1337    }
1338
1339    /// Send a WhoIs broadcast to a specific remote network.
1340    ///
1341    /// Unlike `who_is()` which broadcasts globally (DNET=0xFFFF), this
1342    /// targets a single network number so only devices on that network respond.
1343    pub async fn who_is_network(
1344        &self,
1345        dest_network: u16,
1346        low_limit: Option<u32>,
1347        high_limit: Option<u32>,
1348    ) -> Result<(), Error> {
1349        use bacnet_services::who_is::WhoIsRequest;
1350
1351        let request = WhoIsRequest {
1352            low_limit,
1353            high_limit,
1354        };
1355        let mut buf = BytesMut::new();
1356        request.encode(&mut buf);
1357
1358        let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1359            service_choice: UnconfirmedServiceChoice::WHO_IS,
1360            service_request: Bytes::copy_from_slice(&buf),
1361        });
1362
1363        let mut apdu_buf = BytesMut::with_capacity(2 + buf.len());
1364        encode_apdu(&mut apdu_buf, &pdu);
1365
1366        self.network
1367            .broadcast_to_network(&apdu_buf, dest_network, false, NetworkPriority::NORMAL)
1368            .await
1369    }
1370
1371    /// Send a WhoHas broadcast to find an object by identifier or name.
1372    pub async fn who_has(
1373        &self,
1374        object: bacnet_services::who_has::WhoHasObject,
1375        low_limit: Option<u32>,
1376        high_limit: Option<u32>,
1377    ) -> Result<(), Error> {
1378        use bacnet_services::who_has::WhoHasRequest;
1379
1380        let request = WhoHasRequest {
1381            low_limit,
1382            high_limit,
1383            object,
1384        };
1385        let mut buf = BytesMut::new();
1386        request.encode(&mut buf)?;
1387
1388        self.broadcast_unconfirmed(UnconfirmedServiceChoice::WHO_HAS, &buf)
1389            .await
1390    }
1391
1392    /// Subscribe to COV notifications for an object on a remote device.
1393    pub async fn subscribe_cov(
1394        &self,
1395        destination_mac: &[u8],
1396        subscriber_process_identifier: u32,
1397        monitored_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1398        confirmed: bool,
1399        lifetime: Option<u32>,
1400    ) -> Result<(), Error> {
1401        use bacnet_services::cov::SubscribeCOVRequest;
1402
1403        let request = SubscribeCOVRequest {
1404            subscriber_process_identifier,
1405            monitored_object_identifier,
1406            issue_confirmed_notifications: Some(confirmed),
1407            lifetime,
1408        };
1409        let mut buf = BytesMut::new();
1410        request.encode(&mut buf);
1411
1412        let _ = self
1413            .confirmed_request(destination_mac, ConfirmedServiceChoice::SUBSCRIBE_COV, &buf)
1414            .await?;
1415
1416        Ok(())
1417    }
1418
1419    /// Cancel a COV subscription on a remote device.
1420    pub async fn unsubscribe_cov(
1421        &self,
1422        destination_mac: &[u8],
1423        subscriber_process_identifier: u32,
1424        monitored_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1425    ) -> Result<(), Error> {
1426        use bacnet_services::cov::SubscribeCOVRequest;
1427
1428        let request = SubscribeCOVRequest {
1429            subscriber_process_identifier,
1430            monitored_object_identifier,
1431            issue_confirmed_notifications: None,
1432            lifetime: None,
1433        };
1434        let mut buf = BytesMut::new();
1435        request.encode(&mut buf);
1436
1437        let _ = self
1438            .confirmed_request(destination_mac, ConfirmedServiceChoice::SUBSCRIBE_COV, &buf)
1439            .await?;
1440
1441        Ok(())
1442    }
1443
1444    /// Delete an object on a remote device.
1445    pub async fn delete_object(
1446        &self,
1447        destination_mac: &[u8],
1448        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1449    ) -> Result<(), Error> {
1450        use bacnet_services::object_mgmt::DeleteObjectRequest;
1451
1452        let request = DeleteObjectRequest { object_identifier };
1453        let mut buf = BytesMut::new();
1454        request.encode(&mut buf);
1455
1456        let _ = self
1457            .confirmed_request(destination_mac, ConfirmedServiceChoice::DELETE_OBJECT, &buf)
1458            .await?;
1459
1460        Ok(())
1461    }
1462
1463    /// Create an object on a remote device.
1464    pub async fn create_object(
1465        &self,
1466        destination_mac: &[u8],
1467        object_specifier: bacnet_services::object_mgmt::ObjectSpecifier,
1468        initial_values: Vec<bacnet_services::common::BACnetPropertyValue>,
1469    ) -> Result<Bytes, Error> {
1470        use bacnet_services::object_mgmt::CreateObjectRequest;
1471
1472        let request = CreateObjectRequest {
1473            object_specifier,
1474            list_of_initial_values: initial_values,
1475        };
1476        let mut buf = BytesMut::new();
1477        request.encode(&mut buf);
1478
1479        self.confirmed_request(destination_mac, ConfirmedServiceChoice::CREATE_OBJECT, &buf)
1480            .await
1481    }
1482
1483    /// Send DeviceCommunicationControl to a remote device.
1484    pub async fn device_communication_control(
1485        &self,
1486        destination_mac: &[u8],
1487        enable_disable: bacnet_types::enums::EnableDisable,
1488        time_duration: Option<u16>,
1489        password: Option<String>,
1490    ) -> Result<(), Error> {
1491        use bacnet_services::device_mgmt::DeviceCommunicationControlRequest;
1492
1493        let request = DeviceCommunicationControlRequest {
1494            time_duration,
1495            enable_disable,
1496            password,
1497        };
1498        let mut buf = BytesMut::new();
1499        request.encode(&mut buf)?;
1500
1501        let _ = self
1502            .confirmed_request(
1503                destination_mac,
1504                ConfirmedServiceChoice::DEVICE_COMMUNICATION_CONTROL,
1505                &buf,
1506            )
1507            .await?;
1508
1509        Ok(())
1510    }
1511
1512    /// Send ReinitializeDevice to a remote device.
1513    pub async fn reinitialize_device(
1514        &self,
1515        destination_mac: &[u8],
1516        reinitialized_state: bacnet_types::enums::ReinitializedState,
1517        password: Option<String>,
1518    ) -> Result<(), Error> {
1519        use bacnet_services::device_mgmt::ReinitializeDeviceRequest;
1520
1521        let request = ReinitializeDeviceRequest {
1522            reinitialized_state,
1523            password,
1524        };
1525        let mut buf = BytesMut::new();
1526        request.encode(&mut buf)?;
1527
1528        let _ = self
1529            .confirmed_request(
1530                destination_mac,
1531                ConfirmedServiceChoice::REINITIALIZE_DEVICE,
1532                &buf,
1533            )
1534            .await?;
1535
1536        Ok(())
1537    }
1538
1539    /// Get event information from a remote device.
1540    pub async fn get_event_information(
1541        &self,
1542        destination_mac: &[u8],
1543        last_received_object_identifier: Option<bacnet_types::primitives::ObjectIdentifier>,
1544    ) -> Result<Bytes, Error> {
1545        use bacnet_services::alarm_event::GetEventInformationRequest;
1546
1547        let request = GetEventInformationRequest {
1548            last_received_object_identifier,
1549        };
1550        let mut buf = BytesMut::new();
1551        request.encode(&mut buf);
1552
1553        self.confirmed_request(
1554            destination_mac,
1555            ConfirmedServiceChoice::GET_EVENT_INFORMATION,
1556            &buf,
1557        )
1558        .await
1559    }
1560
1561    /// Acknowledge an alarm on a remote device.
1562    pub async fn acknowledge_alarm(
1563        &self,
1564        destination_mac: &[u8],
1565        acknowledging_process_identifier: u32,
1566        event_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1567        event_state_acknowledged: u32,
1568        acknowledgment_source: &str,
1569    ) -> Result<(), Error> {
1570        use bacnet_services::alarm_event::AcknowledgeAlarmRequest;
1571
1572        let request = AcknowledgeAlarmRequest {
1573            acknowledging_process_identifier,
1574            event_object_identifier,
1575            event_state_acknowledged,
1576            timestamp: bacnet_types::primitives::BACnetTimeStamp::SequenceNumber(0),
1577            acknowledgment_source: acknowledgment_source.to_string(),
1578        };
1579        let mut buf = BytesMut::new();
1580        request.encode(&mut buf)?;
1581
1582        let _ = self
1583            .confirmed_request(
1584                destination_mac,
1585                ConfirmedServiceChoice::ACKNOWLEDGE_ALARM,
1586                &buf,
1587            )
1588            .await?;
1589
1590        Ok(())
1591    }
1592
1593    /// Read a range of items from a list or log-buffer property.
1594    pub async fn read_range(
1595        &self,
1596        destination_mac: &[u8],
1597        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1598        property_identifier: bacnet_types::enums::PropertyIdentifier,
1599        property_array_index: Option<u32>,
1600        range: Option<bacnet_services::read_range::RangeSpec>,
1601    ) -> Result<bacnet_services::read_range::ReadRangeAck, Error> {
1602        use bacnet_services::read_range::{ReadRangeAck, ReadRangeRequest};
1603
1604        let request = ReadRangeRequest {
1605            object_identifier,
1606            property_identifier,
1607            property_array_index,
1608            range,
1609        };
1610        let mut buf = BytesMut::new();
1611        request.encode(&mut buf);
1612
1613        let response_data = self
1614            .confirmed_request(destination_mac, ConfirmedServiceChoice::READ_RANGE, &buf)
1615            .await?;
1616
1617        ReadRangeAck::decode(&response_data)
1618    }
1619
1620    /// Read file data from a remote device (stream or record access).
1621    pub async fn atomic_read_file(
1622        &self,
1623        destination_mac: &[u8],
1624        file_identifier: bacnet_types::primitives::ObjectIdentifier,
1625        access: bacnet_services::file::FileAccessMethod,
1626    ) -> Result<Bytes, Error> {
1627        use bacnet_services::file::AtomicReadFileRequest;
1628
1629        let request = AtomicReadFileRequest {
1630            file_identifier,
1631            access,
1632        };
1633        let mut buf = BytesMut::new();
1634        request.encode(&mut buf);
1635
1636        self.confirmed_request(
1637            destination_mac,
1638            ConfirmedServiceChoice::ATOMIC_READ_FILE,
1639            &buf,
1640        )
1641        .await
1642    }
1643
1644    /// Write file data to a remote device (stream or record access).
1645    pub async fn atomic_write_file(
1646        &self,
1647        destination_mac: &[u8],
1648        file_identifier: bacnet_types::primitives::ObjectIdentifier,
1649        access: bacnet_services::file::FileWriteAccessMethod,
1650    ) -> Result<Bytes, Error> {
1651        use bacnet_services::file::AtomicWriteFileRequest;
1652
1653        let request = AtomicWriteFileRequest {
1654            file_identifier,
1655            access,
1656        };
1657        let mut buf = BytesMut::new();
1658        request.encode(&mut buf);
1659
1660        self.confirmed_request(
1661            destination_mac,
1662            ConfirmedServiceChoice::ATOMIC_WRITE_FILE,
1663            &buf,
1664        )
1665        .await
1666    }
1667
1668    /// Add elements to a list property on a remote device.
1669    pub async fn add_list_element(
1670        &self,
1671        destination_mac: &[u8],
1672        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1673        property_identifier: bacnet_types::enums::PropertyIdentifier,
1674        property_array_index: Option<u32>,
1675        list_of_elements: Vec<u8>,
1676    ) -> Result<(), Error> {
1677        use bacnet_services::list_manipulation::ListElementRequest;
1678
1679        let request = ListElementRequest {
1680            object_identifier,
1681            property_identifier,
1682            property_array_index,
1683            list_of_elements,
1684        };
1685        let mut buf = BytesMut::new();
1686        request.encode(&mut buf);
1687
1688        let _ = self
1689            .confirmed_request(
1690                destination_mac,
1691                ConfirmedServiceChoice::ADD_LIST_ELEMENT,
1692                &buf,
1693            )
1694            .await?;
1695
1696        Ok(())
1697    }
1698
1699    /// Remove elements from a list property on a remote device.
1700    pub async fn remove_list_element(
1701        &self,
1702        destination_mac: &[u8],
1703        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1704        property_identifier: bacnet_types::enums::PropertyIdentifier,
1705        property_array_index: Option<u32>,
1706        list_of_elements: Vec<u8>,
1707    ) -> Result<(), Error> {
1708        use bacnet_services::list_manipulation::ListElementRequest;
1709
1710        let request = ListElementRequest {
1711            object_identifier,
1712            property_identifier,
1713            property_array_index,
1714            list_of_elements,
1715        };
1716        let mut buf = BytesMut::new();
1717        request.encode(&mut buf);
1718
1719        let _ = self
1720            .confirmed_request(
1721                destination_mac,
1722                ConfirmedServiceChoice::REMOVE_LIST_ELEMENT,
1723                &buf,
1724            )
1725            .await?;
1726
1727        Ok(())
1728    }
1729
1730    /// Send a TimeSynchronization request to a device (Clause 16.10.5).
1731    ///
1732    /// This is an unconfirmed service — no response is expected.
1733    pub async fn time_synchronization(
1734        &self,
1735        destination_mac: &[u8],
1736        date: bacnet_types::primitives::Date,
1737        time: bacnet_types::primitives::Time,
1738    ) -> Result<(), Error> {
1739        use bacnet_services::device_mgmt::TimeSynchronizationRequest;
1740
1741        let request = TimeSynchronizationRequest { date, time };
1742        let mut buf = BytesMut::new();
1743        request.encode(&mut buf);
1744
1745        self.unconfirmed_request(
1746            destination_mac,
1747            UnconfirmedServiceChoice::TIME_SYNCHRONIZATION,
1748            &buf,
1749        )
1750        .await
1751    }
1752
1753    /// Send a UTCTimeSynchronization request to a device (Clause 16.10.6).
1754    ///
1755    /// This is an unconfirmed service — no response is expected.
1756    pub async fn utc_time_synchronization(
1757        &self,
1758        destination_mac: &[u8],
1759        date: bacnet_types::primitives::Date,
1760        time: bacnet_types::primitives::Time,
1761    ) -> Result<(), Error> {
1762        use bacnet_services::device_mgmt::TimeSynchronizationRequest;
1763
1764        let request = TimeSynchronizationRequest { date, time };
1765        let mut buf = BytesMut::new();
1766        request.encode(&mut buf);
1767
1768        self.unconfirmed_request(
1769            destination_mac,
1770            UnconfirmedServiceChoice::UTC_TIME_SYNCHRONIZATION,
1771            &buf,
1772        )
1773        .await
1774    }
1775
1776    /// Get a receiver for incoming COV notifications.
1777    ///
1778    /// Can be called multiple times — each call returns a new independent
1779    /// receiver that gets all notifications from that point forward.
1780    pub fn cov_notifications(&self) -> broadcast::Receiver<COVNotificationRequest> {
1781        self.cov_tx.subscribe()
1782    }
1783
1784    // -----------------------------------------------------------------------
1785    // Device discovery
1786    // -----------------------------------------------------------------------
1787
1788    /// Get a snapshot of all discovered devices (from IAm responses).
1789    pub async fn discovered_devices(&self) -> Vec<DiscoveredDevice> {
1790        self.device_table.lock().await.all()
1791    }
1792
1793    /// Look up a discovered device by instance number.
1794    pub async fn get_device(&self, instance: u32) -> Option<DiscoveredDevice> {
1795        self.device_table.lock().await.get(instance).cloned()
1796    }
1797
1798    /// Clear the discovered devices table.
1799    pub async fn clear_devices(&self) {
1800        self.device_table.lock().await.clear();
1801    }
1802
1803    /// Stop the client, aborting the dispatch task.
1804    pub async fn stop(&mut self) -> Result<(), Error> {
1805        if let Some(task) = self.dispatch_task.take() {
1806            task.abort();
1807            let _ = task.await;
1808        }
1809        // Network/transport cleanup happens when the Arc is dropped.
1810        Ok(())
1811    }
1812}
1813
1814#[cfg(test)]
1815mod tests {
1816    use super::*;
1817    use bacnet_encoding::apdu::{ComplexAck, SimpleAck};
1818    use std::net::Ipv4Addr;
1819    use tokio::time::Duration;
1820
1821    /// Helper: build a client on loopback with ephemeral port and short timeout.
1822    async fn make_client() -> BACnetClient<BipTransport> {
1823        BACnetClient::builder()
1824            .interface(Ipv4Addr::LOCALHOST)
1825            .port(0)
1826            .apdu_timeout_ms(2000)
1827            .build()
1828            .await
1829            .unwrap()
1830    }
1831
1832    #[tokio::test]
1833    async fn client_start_stop() {
1834        let mut client = make_client().await;
1835        assert!(!client.local_mac().is_empty());
1836        client.stop().await.unwrap();
1837    }
1838
1839    #[tokio::test]
1840    async fn confirmed_request_simple_ack() {
1841        let mut client_a = make_client().await;
1842
1843        // Create a second network layer to act as "server B"
1844        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1845        let mut net_b = NetworkLayer::new(transport_b);
1846        let mut rx_b = net_b.start().await.unwrap();
1847        let b_mac = net_b.local_mac().to_vec();
1848
1849        // Spawn a task that receives the request and sends back SimpleAck
1850        let b_handle = tokio::spawn(async move {
1851            let received = timeout(Duration::from_secs(2), rx_b.recv())
1852                .await
1853                .expect("B timed out")
1854                .expect("B channel closed");
1855
1856            let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1857            if let Apdu::ConfirmedRequest(req) = decoded {
1858                let ack = Apdu::SimpleAck(SimpleAck {
1859                    invoke_id: req.invoke_id,
1860                    service_choice: req.service_choice,
1861                });
1862                let mut buf = BytesMut::new();
1863                encode_apdu(&mut buf, &ack);
1864                net_b
1865                    .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1866                    .await
1867                    .unwrap();
1868            }
1869            net_b.stop().await.unwrap();
1870        });
1871
1872        let result = client_a
1873            .confirmed_request(
1874                &b_mac,
1875                ConfirmedServiceChoice::WRITE_PROPERTY,
1876                &[0x01, 0x02],
1877            )
1878            .await;
1879
1880        assert!(result.is_ok());
1881        let response = result.unwrap();
1882        assert!(response.is_empty()); // SimpleAck has no service data
1883
1884        b_handle.await.unwrap();
1885        client_a.stop().await.unwrap();
1886    }
1887
1888    #[tokio::test]
1889    async fn confirmed_request_complex_ack() {
1890        let mut client_a = make_client().await;
1891
1892        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1893        let mut net_b = NetworkLayer::new(transport_b);
1894        let mut rx_b = net_b.start().await.unwrap();
1895        let b_mac = net_b.local_mac().to_vec();
1896
1897        let b_handle = tokio::spawn(async move {
1898            let received = timeout(Duration::from_secs(2), rx_b.recv())
1899                .await
1900                .unwrap()
1901                .unwrap();
1902
1903            let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1904            if let Apdu::ConfirmedRequest(req) = decoded {
1905                let ack = Apdu::ComplexAck(ComplexAck {
1906                    segmented: false,
1907                    more_follows: false,
1908                    invoke_id: req.invoke_id,
1909                    sequence_number: None,
1910                    proposed_window_size: None,
1911                    service_choice: req.service_choice,
1912                    service_ack: Bytes::from_static(&[0xDE, 0xAD, 0xBE, 0xEF]),
1913                });
1914                let mut buf = BytesMut::new();
1915                encode_apdu(&mut buf, &ack);
1916                net_b
1917                    .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1918                    .await
1919                    .unwrap();
1920            }
1921            net_b.stop().await.unwrap();
1922        });
1923
1924        let result = client_a
1925            .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
1926            .await;
1927
1928        assert!(result.is_ok());
1929        assert_eq!(result.unwrap(), vec![0xDE, 0xAD, 0xBE, 0xEF]);
1930
1931        b_handle.await.unwrap();
1932        client_a.stop().await.unwrap();
1933    }
1934
1935    #[tokio::test]
1936    async fn confirmed_request_timeout() {
1937        let mut client = make_client().await;
1938        // Send to a non-existent address — should timeout
1939        let fake_mac = vec![10, 99, 99, 99, 0xBA, 0xC0];
1940        let result = client
1941            .confirmed_request(&fake_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
1942            .await;
1943        assert!(result.is_err());
1944        client.stop().await.unwrap();
1945    }
1946
1947    #[tokio::test]
1948    async fn segmented_complex_ack_reassembly() {
1949        let mut client = make_client().await;
1950
1951        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1952        let mut net_b = NetworkLayer::new(transport_b);
1953        let mut rx_b = net_b.start().await.unwrap();
1954        let b_mac = net_b.local_mac().to_vec();
1955
1956        // Server B: receive request, respond with 3-segment ComplexAck.
1957        // Wait for SegmentAck after each segment before sending the next.
1958        let b_handle = tokio::spawn(async move {
1959            // Receive the ConfirmedRequest
1960            let received = timeout(Duration::from_secs(2), rx_b.recv())
1961                .await
1962                .unwrap()
1963                .unwrap();
1964
1965            let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1966            let invoke_id = if let Apdu::ConfirmedRequest(req) = decoded {
1967                req.invoke_id
1968            } else {
1969                panic!("Expected ConfirmedRequest");
1970            };
1971
1972            let service_choice = ConfirmedServiceChoice::READ_PROPERTY;
1973            let segments: Vec<Bytes> = vec![
1974                Bytes::from_static(&[0x01, 0x02, 0x03]),
1975                Bytes::from_static(&[0x04, 0x05, 0x06]),
1976                Bytes::from_static(&[0x07, 0x08]),
1977            ];
1978
1979            for (i, seg) in segments.iter().enumerate() {
1980                let is_last = i == segments.len() - 1;
1981                let ack = Apdu::ComplexAck(ComplexAck {
1982                    segmented: true,
1983                    more_follows: !is_last,
1984                    invoke_id,
1985                    sequence_number: Some(i as u8),
1986                    proposed_window_size: Some(1),
1987                    service_choice,
1988                    service_ack: seg.clone(),
1989                });
1990                let mut buf = BytesMut::new();
1991                encode_apdu(&mut buf, &ack);
1992                net_b
1993                    .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1994                    .await
1995                    .unwrap();
1996
1997                // Wait for SegmentAck from client
1998                let seg_ack_msg = timeout(Duration::from_secs(2), rx_b.recv())
1999                    .await
2000                    .unwrap()
2001                    .unwrap();
2002                let decoded = apdu::decode_apdu(seg_ack_msg.apdu.clone()).unwrap();
2003                if let Apdu::SegmentAck(sa) = decoded {
2004                    assert_eq!(sa.invoke_id, invoke_id);
2005                    assert_eq!(sa.sequence_number, i as u8);
2006                } else {
2007                    panic!("Expected SegmentAck, got {:?}", decoded);
2008                }
2009            }
2010
2011            net_b.stop().await.unwrap();
2012        });
2013
2014        // Client sends a request and should receive the reassembled response
2015        let result = client
2016            .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
2017            .await;
2018
2019        assert!(result.is_ok());
2020        assert_eq!(
2021            result.unwrap(),
2022            vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
2023        );
2024
2025        b_handle.await.unwrap();
2026        client.stop().await.unwrap();
2027    }
2028
2029    #[tokio::test]
2030    async fn segmented_confirmed_request_sends_segments() {
2031        // Client with max_apdu_length=50 → max segment payload = 44 bytes.
2032        // Any service_data > 46 bytes will trigger segmentation.
2033        let mut client = BACnetClient::builder()
2034            .interface(Ipv4Addr::LOCALHOST)
2035            .port(0)
2036            .apdu_timeout_ms(5000)
2037            .max_apdu_length(50)
2038            .build()
2039            .await
2040            .unwrap();
2041
2042        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2043        let mut net_b = NetworkLayer::new(transport_b);
2044        let mut rx_b = net_b.start().await.unwrap();
2045        let b_mac = net_b.local_mac().to_vec();
2046
2047        // 100 bytes of service data → ceil(100/44) = 3 segments (44 + 44 + 12)
2048        let service_data: Vec<u8> = (0u8..100).collect();
2049        let expected_data = service_data.clone();
2050
2051        let b_handle = tokio::spawn(async move {
2052            let mut all_service_data = Vec::new();
2053            let mut client_mac;
2054            let mut invoke_id;
2055
2056            loop {
2057                let received = timeout(Duration::from_secs(3), rx_b.recv())
2058                    .await
2059                    .expect("server timed out waiting for segment")
2060                    .expect("server channel closed");
2061
2062                let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2063                if let Apdu::ConfirmedRequest(req) = decoded {
2064                    assert!(req.segmented, "expected segmented request");
2065                    invoke_id = req.invoke_id;
2066                    client_mac = received.source_mac.clone();
2067                    let seq = req.sequence_number.unwrap();
2068                    all_service_data.extend_from_slice(&req.service_request);
2069
2070                    // Send SegmentAck
2071                    let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
2072                        negative_ack: false,
2073                        sent_by_server: true,
2074                        invoke_id,
2075                        sequence_number: seq,
2076                        actual_window_size: 1,
2077                    });
2078                    let mut buf = BytesMut::new();
2079                    encode_apdu(&mut buf, &seg_ack);
2080                    net_b
2081                        .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2082                        .await
2083                        .unwrap();
2084
2085                    if !req.more_follows {
2086                        break;
2087                    }
2088                } else {
2089                    panic!("Expected ConfirmedRequest, got {:?}", decoded);
2090                }
2091            }
2092
2093            // All segments received — send SimpleAck
2094            let ack = Apdu::SimpleAck(SimpleAck {
2095                invoke_id,
2096                service_choice: ConfirmedServiceChoice::WRITE_PROPERTY,
2097            });
2098            let mut buf = BytesMut::new();
2099            encode_apdu(&mut buf, &ack);
2100            net_b
2101                .send_apdu(&buf, &client_mac, false, NetworkPriority::NORMAL)
2102                .await
2103                .unwrap();
2104
2105            net_b.stop().await.unwrap();
2106            all_service_data
2107        });
2108
2109        let result = client
2110            .confirmed_request(
2111                &b_mac,
2112                ConfirmedServiceChoice::WRITE_PROPERTY,
2113                &service_data,
2114            )
2115            .await;
2116
2117        assert!(result.is_ok());
2118        assert!(result.unwrap().is_empty()); // SimpleAck has no service data
2119
2120        // Verify server received all service data correctly
2121        let received_data = b_handle.await.unwrap();
2122        assert_eq!(received_data, expected_data);
2123
2124        client.stop().await.unwrap();
2125    }
2126
2127    #[tokio::test]
2128    async fn segmented_request_with_complex_ack_response() {
2129        let mut client = BACnetClient::builder()
2130            .interface(Ipv4Addr::LOCALHOST)
2131            .port(0)
2132            .apdu_timeout_ms(5000)
2133            .max_apdu_length(50)
2134            .build()
2135            .await
2136            .unwrap();
2137
2138        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2139        let mut net_b = NetworkLayer::new(transport_b);
2140        let mut rx_b = net_b.start().await.unwrap();
2141        let b_mac = net_b.local_mac().to_vec();
2142
2143        // 60 bytes → 2 segments (44 + 16)
2144        let service_data: Vec<u8> = (0u8..60).collect();
2145
2146        let b_handle = tokio::spawn(async move {
2147            let mut client_mac;
2148            let mut invoke_id;
2149
2150            loop {
2151                let received = timeout(Duration::from_secs(3), rx_b.recv())
2152                    .await
2153                    .unwrap()
2154                    .unwrap();
2155
2156                let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2157                if let Apdu::ConfirmedRequest(req) = decoded {
2158                    invoke_id = req.invoke_id;
2159                    client_mac = received.source_mac.clone();
2160                    let seq = req.sequence_number.unwrap();
2161
2162                    let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
2163                        negative_ack: false,
2164                        sent_by_server: true,
2165                        invoke_id,
2166                        sequence_number: seq,
2167                        actual_window_size: 1,
2168                    });
2169                    let mut buf = BytesMut::new();
2170                    encode_apdu(&mut buf, &seg_ack);
2171                    net_b
2172                        .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2173                        .await
2174                        .unwrap();
2175
2176                    if !req.more_follows {
2177                        break;
2178                    }
2179                }
2180            }
2181
2182            // Send ComplexAck response
2183            let ack = Apdu::ComplexAck(ComplexAck {
2184                segmented: false,
2185                more_follows: false,
2186                invoke_id,
2187                sequence_number: None,
2188                proposed_window_size: None,
2189                service_choice: ConfirmedServiceChoice::READ_PROPERTY,
2190                service_ack: Bytes::from_static(&[0xCA, 0xFE]),
2191            });
2192            let mut buf = BytesMut::new();
2193            encode_apdu(&mut buf, &ack);
2194            net_b
2195                .send_apdu(&buf, &client_mac, false, NetworkPriority::NORMAL)
2196                .await
2197                .unwrap();
2198
2199            net_b.stop().await.unwrap();
2200        });
2201
2202        let result = client
2203            .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &service_data)
2204            .await;
2205
2206        assert!(result.is_ok());
2207        assert_eq!(result.unwrap(), vec![0xCA, 0xFE]);
2208
2209        b_handle.await.unwrap();
2210        client.stop().await.unwrap();
2211    }
2212
2213    #[tokio::test]
2214    async fn segment_overflow_guard() {
2215        // Create a client with small max_apdu_length so segments are tiny.
2216        let mut client = BACnetClient::builder()
2217            .interface(Ipv4Addr::LOCALHOST)
2218            .port(0)
2219            .apdu_timeout_ms(2000)
2220            .max_apdu_length(50)
2221            .build()
2222            .await
2223            .unwrap();
2224
2225        // With max_apdu=50, each segment carries 50-6=44 bytes.
2226        // 256 * 44 = 11,264 bytes → 256 segments, which exceeds the u8 limit.
2227        let huge_payload = vec![0u8; 256 * 44];
2228
2229        // Use a fake destination MAC not in the device table — the client
2230        // will fall back to its own max_apdu_length (50), triggering segmentation.
2231        let fake_mac = vec![10, 99, 99, 99, 0xBA, 0xC0];
2232
2233        let result = client
2234            .confirmed_request(
2235                &fake_mac,
2236                ConfirmedServiceChoice::READ_PROPERTY,
2237                &huge_payload,
2238            )
2239            .await;
2240
2241        assert!(result.is_err());
2242        let err_msg = result.unwrap_err().to_string();
2243        assert!(
2244            err_msg.contains("256 segments"),
2245            "expected segment overflow error, got: {}",
2246            err_msg
2247        );
2248
2249        client.stop().await.unwrap();
2250    }
2251
2252    #[test]
2253    fn seg_receiver_timeout_is_4s() {
2254        assert_eq!(SEG_RECEIVER_TIMEOUT, Duration::from_secs(4));
2255    }
2256}