Skip to main content

bacnet_client/
client.rs

1//! BACnetClient: high-level and low-level request APIs.
2//!
3//! The client owns a NetworkLayer, spawns an APDU dispatch task, and provides
4//! methods for sending confirmed and unconfirmed BACnet requests.
5
6use std::collections::HashMap;
7use std::net::Ipv4Addr;
8#[cfg(feature = "ipv6")]
9use std::net::Ipv6Addr;
10use std::sync::Arc;
11use std::time::Instant;
12
13use bytes::{Bytes, BytesMut};
14use tokio::sync::{broadcast, mpsc, Mutex};
15use tokio::task::JoinHandle;
16use tokio::time::{timeout, Duration};
17use tracing::{debug, warn};
18
19use bacnet_encoding::apdu::{
20    self, encode_apdu, AbortPdu, Apdu, ConfirmedRequest as ConfirmedRequestPdu,
21    SegmentAck as SegmentAckPdu, SimpleAck,
22};
23use bacnet_encoding::npdu::NpduAddress;
24use bacnet_network::layer::NetworkLayer;
25use bacnet_services::cov::COVNotificationRequest;
26use bacnet_transport::bip::BipTransport;
27#[cfg(feature = "ipv6")]
28use bacnet_transport::bip6::Bip6Transport;
29use bacnet_transport::port::TransportPort;
30use bacnet_types::enums::{ConfirmedServiceChoice, NetworkPriority, UnconfirmedServiceChoice};
31use bacnet_types::error::Error;
32use bacnet_types::MacAddr;
33
34use crate::discovery::{DeviceTable, DiscoveredDevice};
35use crate::segmentation::{max_segment_payload, split_payload, SegmentReceiver, SegmentedPduType};
36use crate::tsm::{Tsm, TsmConfig, TsmResponse};
37
38/// Client configuration.
39#[derive(Debug, Clone)]
40pub struct ClientConfig {
41    /// Local interface to bind.
42    pub interface: Ipv4Addr,
43    /// UDP port (0 for ephemeral).
44    pub port: u16,
45    /// Directed broadcast address.
46    pub broadcast_address: Ipv4Addr,
47    /// APDU timeout in milliseconds.
48    pub apdu_timeout_ms: u64,
49    /// Number of APDU retries.
50    pub apdu_retries: u8,
51    /// Maximum APDU length this client accepts.
52    pub max_apdu_length: u16,
53    /// Maximum segments this client accepts (None = unspecified).
54    pub max_segments: Option<u8>,
55    /// Whether this client accepts segmented responses.
56    pub segmented_response_accepted: bool,
57    /// Proposed window size for segmented transfers (1-127, default 1).
58    pub proposed_window_size: u8,
59}
60
61impl Default for ClientConfig {
62    fn default() -> Self {
63        Self {
64            interface: Ipv4Addr::UNSPECIFIED,
65            port: 0xBAC0,
66            broadcast_address: Ipv4Addr::BROADCAST,
67            apdu_timeout_ms: 6000,
68            apdu_retries: 3,
69            max_apdu_length: 1476,
70            max_segments: None,
71            segmented_response_accepted: true,
72            proposed_window_size: 1,
73        }
74    }
75}
76
77/// Generic builder for BACnetClient with a pre-built transport.
78pub struct ClientBuilder<T: TransportPort> {
79    config: ClientConfig,
80    transport: Option<T>,
81}
82
83impl<T: TransportPort + 'static> ClientBuilder<T> {
84    /// Set the pre-built transport.
85    pub fn transport(mut self, transport: T) -> Self {
86        self.transport = Some(transport);
87        self
88    }
89
90    /// Set APDU timeout in milliseconds.
91    pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
92        self.config.apdu_timeout_ms = ms;
93        self
94    }
95
96    /// Set the maximum APDU length this client accepts.
97    pub fn max_apdu_length(mut self, len: u16) -> Self {
98        self.config.max_apdu_length = len;
99        self
100    }
101
102    /// Build and start the client.
103    pub async fn build(self) -> Result<BACnetClient<T>, Error> {
104        let transport = self
105            .transport
106            .ok_or_else(|| Error::Encoding("transport not set on ClientBuilder".into()))?;
107        BACnetClient::start(self.config, transport).await
108    }
109}
110
111/// BIP-specific builder that constructs `BipTransport` from interface/port/broadcast fields.
112pub struct BipClientBuilder {
113    config: ClientConfig,
114}
115
116impl BipClientBuilder {
117    /// Set the local interface IP.
118    pub fn interface(mut self, ip: Ipv4Addr) -> Self {
119        self.config.interface = ip;
120        self
121    }
122
123    /// Set the UDP port (0 for ephemeral).
124    pub fn port(mut self, port: u16) -> Self {
125        self.config.port = port;
126        self
127    }
128
129    /// Set the directed broadcast address.
130    pub fn broadcast_address(mut self, addr: Ipv4Addr) -> Self {
131        self.config.broadcast_address = addr;
132        self
133    }
134
135    /// Set APDU timeout in milliseconds.
136    pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
137        self.config.apdu_timeout_ms = ms;
138        self
139    }
140
141    /// Set the maximum APDU length this client accepts.
142    pub fn max_apdu_length(mut self, len: u16) -> Self {
143        self.config.max_apdu_length = len;
144        self
145    }
146
147    /// Build and start the client, constructing a BipTransport from the config.
148    pub async fn build(self) -> Result<BACnetClient<BipTransport>, Error> {
149        let transport = BipTransport::new(
150            self.config.interface,
151            self.config.port,
152            self.config.broadcast_address,
153        );
154        BACnetClient::start(self.config, transport).await
155    }
156}
157
158// ---------------------------------------------------------------------------
159// Multi-device batch operation types
160// ---------------------------------------------------------------------------
161
162/// Default concurrency limit for multi-device batch operations.
163const DEFAULT_BATCH_CONCURRENCY: usize = 32;
164
165/// A request to read a single property from a discovered device.
166#[derive(Debug, Clone)]
167pub struct DeviceReadRequest {
168    /// Device instance number (must be in the device table).
169    pub device_instance: u32,
170    /// Object to read from.
171    pub object_identifier: bacnet_types::primitives::ObjectIdentifier,
172    /// Property to read.
173    pub property_identifier: bacnet_types::enums::PropertyIdentifier,
174    /// Optional array index.
175    pub property_array_index: Option<u32>,
176}
177
178/// Result of a single-property read from a device within a batch.
179#[derive(Debug)]
180pub struct DeviceReadResult {
181    /// The device instance this result corresponds to.
182    pub device_instance: u32,
183    /// The read result (Ok = decoded ACK, Err = protocol/timeout error).
184    pub result: Result<bacnet_services::read_property::ReadPropertyACK, Error>,
185}
186
187/// A request to read multiple properties from a discovered device (RPM).
188#[derive(Debug, Clone)]
189pub struct DeviceRpmRequest {
190    /// Device instance number (must be in the device table).
191    pub device_instance: u32,
192    /// ReadAccessSpecifications to send in a single RPM.
193    pub specs: Vec<bacnet_services::rpm::ReadAccessSpecification>,
194}
195
196/// Result of an RPM to a single device within a batch.
197#[derive(Debug)]
198pub struct DeviceRpmResult {
199    /// The device instance this result corresponds to.
200    pub device_instance: u32,
201    /// The RPM result.
202    pub result: Result<bacnet_services::rpm::ReadPropertyMultipleACK, Error>,
203}
204
205/// A request to write a single property on a discovered device.
206#[derive(Debug, Clone)]
207pub struct DeviceWriteRequest {
208    /// Device instance number (must be in the device table).
209    pub device_instance: u32,
210    /// Object to write to.
211    pub object_identifier: bacnet_types::primitives::ObjectIdentifier,
212    /// Property to write.
213    pub property_identifier: bacnet_types::enums::PropertyIdentifier,
214    /// Optional array index.
215    pub property_array_index: Option<u32>,
216    /// Encoded property value bytes.
217    pub property_value: Vec<u8>,
218    /// Optional write priority (1-16).
219    pub priority: Option<u8>,
220}
221
222/// Result of a single-property write to a device within a batch.
223#[derive(Debug)]
224pub struct DeviceWriteResult {
225    /// The device instance this result corresponds to.
226    pub device_instance: u32,
227    /// The write result (Ok = success, Err = protocol/timeout error).
228    pub result: Result<(), Error>,
229}
230
231/// In-progress segmented receive state.
232struct SegmentedReceiveState {
233    receiver: SegmentReceiver,
234    /// Next expected sequence number (for gap detection).
235    expected_next_seq: u8,
236    /// Timestamp of last received segment (for reaping stale sessions).
237    last_activity: Instant,
238    /// Window position counter for per-window SegmentAck (Clause 5.2.2).
239    window_position: u8,
240    /// Proposed window size from the server.
241    proposed_window_size: u8,
242}
243
244/// Timeout for idle segmented reassembly sessions.
245const SEG_RECEIVER_TIMEOUT: Duration = Duration::from_secs(4);
246
247/// Key for tracking in-progress segmented receives: (source_mac, invoke_id).
248type SegKey = (MacAddr, u8);
249
250/// BACnet client with low-level and high-level request APIs.
251pub struct BACnetClient<T: TransportPort> {
252    config: ClientConfig,
253    network: Arc<NetworkLayer<T>>,
254    tsm: Arc<Mutex<Tsm>>,
255    device_table: Arc<Mutex<DeviceTable>>,
256    cov_tx: broadcast::Sender<COVNotificationRequest>,
257    dispatch_task: Option<JoinHandle<()>>,
258    seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
259    local_mac: MacAddr,
260}
261
262impl BACnetClient<BipTransport> {
263    /// Create a BIP-specific builder with interface/port/broadcast fields.
264    pub fn bip_builder() -> BipClientBuilder {
265        BipClientBuilder {
266            config: ClientConfig::default(),
267        }
268    }
269
270    pub fn builder() -> BipClientBuilder {
271        Self::bip_builder()
272    }
273
274    /// Read the Broadcast Distribution Table from a BBMD.
275    pub async fn read_bdt(
276        &self,
277        target: &[u8],
278    ) -> Result<Vec<bacnet_transport::bbmd::BdtEntry>, Error> {
279        self.network.transport().read_bdt(target).await
280    }
281
282    /// Write the Broadcast Distribution Table to a BBMD.
283    pub async fn write_bdt(
284        &self,
285        target: &[u8],
286        entries: &[bacnet_transport::bbmd::BdtEntry],
287    ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
288        self.network.transport().write_bdt(target, entries).await
289    }
290
291    /// Read the Foreign Device Table from a BBMD.
292    pub async fn read_fdt(
293        &self,
294        target: &[u8],
295    ) -> Result<Vec<bacnet_transport::bbmd::FdtEntryWire>, Error> {
296        self.network.transport().read_fdt(target).await
297    }
298
299    /// Delete a Foreign Device Table entry on a BBMD.
300    pub async fn delete_fdt_entry(
301        &self,
302        target: &[u8],
303        ip: [u8; 4],
304        port: u16,
305    ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
306        self.network
307            .transport()
308            .delete_fdt_entry(target, ip, port)
309            .await
310    }
311
312    /// Register as a foreign device with a BBMD and return the result code.
313    pub async fn register_foreign_device_bvlc(
314        &self,
315        target: &[u8],
316        ttl: u16,
317    ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
318        self.network
319            .transport()
320            .register_foreign_device_bvlc(target, ttl)
321            .await
322    }
323}
324
325#[cfg(feature = "ipv6")]
326impl BACnetClient<Bip6Transport> {
327    /// Create a BIP6-specific builder for BACnet/IPv6 transport.
328    pub fn bip6_builder() -> Bip6ClientBuilder {
329        Bip6ClientBuilder {
330            config: ClientConfig::default(),
331            interface: Ipv6Addr::UNSPECIFIED,
332            device_instance: None,
333        }
334    }
335}
336
337/// BIP6-specific builder that constructs `Bip6Transport` from IPv6 interface/port/device-instance.
338#[cfg(feature = "ipv6")]
339pub struct Bip6ClientBuilder {
340    config: ClientConfig,
341    interface: Ipv6Addr,
342    device_instance: Option<u32>,
343}
344
345#[cfg(feature = "ipv6")]
346impl Bip6ClientBuilder {
347    /// Set the local IPv6 interface address.
348    pub fn interface(mut self, ip: Ipv6Addr) -> Self {
349        self.interface = ip;
350        self
351    }
352
353    /// Set the UDP port (0 for ephemeral).
354    pub fn port(mut self, port: u16) -> Self {
355        self.config.port = port;
356        self
357    }
358
359    /// Set the device instance for VMAC derivation (Annex U.5).
360    pub fn device_instance(mut self, instance: u32) -> Self {
361        self.device_instance = Some(instance);
362        self
363    }
364
365    /// Set APDU timeout in milliseconds.
366    pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
367        self.config.apdu_timeout_ms = ms;
368        self
369    }
370
371    /// Set the maximum APDU length this client accepts.
372    pub fn max_apdu_length(mut self, len: u16) -> Self {
373        self.config.max_apdu_length = len;
374        self
375    }
376
377    /// Build and start the client, constructing a Bip6Transport from the config.
378    pub async fn build(self) -> Result<BACnetClient<Bip6Transport>, Error> {
379        let transport = Bip6Transport::new(self.interface, self.config.port, self.device_instance);
380        BACnetClient::start(self.config, transport).await
381    }
382}
383
384#[cfg(feature = "sc-tls")]
385impl BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>> {
386    /// Create an SC-specific builder that connects to a BACnet/SC hub.
387    pub fn sc_builder() -> ScClientBuilder {
388        ScClientBuilder {
389            config: ClientConfig::default(),
390            hub_url: String::new(),
391            tls_config: None,
392            vmac: [0; 6],
393            heartbeat_interval_ms: 30_000,
394            heartbeat_timeout_ms: 60_000,
395            reconnect: None,
396        }
397    }
398}
399
400/// SC-specific client builder.
401///
402/// Created by [`BACnetClient::sc_builder()`].  Requires the `sc-tls` feature.
403#[cfg(feature = "sc-tls")]
404pub struct ScClientBuilder {
405    config: ClientConfig,
406    hub_url: String,
407    tls_config: Option<std::sync::Arc<tokio_rustls::rustls::ClientConfig>>,
408    vmac: bacnet_transport::sc_frame::Vmac,
409    heartbeat_interval_ms: u64,
410    heartbeat_timeout_ms: u64,
411    reconnect: Option<bacnet_transport::sc::ScReconnectConfig>,
412}
413
414#[cfg(feature = "sc-tls")]
415impl ScClientBuilder {
416    /// Set the hub WebSocket URL (e.g. `wss://hub.example.com/bacnet`).
417    pub fn hub_url(mut self, url: &str) -> Self {
418        self.hub_url = url.to_string();
419        self
420    }
421
422    /// Set the TLS client configuration.
423    pub fn tls_config(
424        mut self,
425        config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
426    ) -> Self {
427        self.tls_config = Some(config);
428        self
429    }
430
431    /// Set the local VMAC address.
432    pub fn vmac(mut self, vmac: [u8; 6]) -> Self {
433        self.vmac = vmac;
434        self
435    }
436
437    /// Set the APDU timeout in milliseconds.
438    pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
439        self.config.apdu_timeout_ms = ms;
440        self
441    }
442
443    /// Set the heartbeat interval in milliseconds (default 30 000).
444    pub fn heartbeat_interval_ms(mut self, ms: u64) -> Self {
445        self.heartbeat_interval_ms = ms;
446        self
447    }
448
449    /// Set the heartbeat timeout in milliseconds (default 60 000).
450    pub fn heartbeat_timeout_ms(mut self, ms: u64) -> Self {
451        self.heartbeat_timeout_ms = ms;
452        self
453    }
454
455    /// Enable automatic reconnection with the given configuration.
456    pub fn reconnect(mut self, config: bacnet_transport::sc::ScReconnectConfig) -> Self {
457        self.reconnect = Some(config);
458        self
459    }
460
461    /// Connect to the hub and start the client.
462    pub async fn build(
463        self,
464    ) -> Result<
465        BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>>,
466        Error,
467    > {
468        let tls_config = self
469            .tls_config
470            .ok_or_else(|| Error::Encoding("SC client builder: tls_config is required".into()))?;
471
472        let ws = bacnet_transport::sc_tls::TlsWebSocket::connect(&self.hub_url, tls_config).await?;
473
474        let mut transport = bacnet_transport::sc::ScTransport::new(ws, self.vmac)
475            .with_heartbeat_interval_ms(self.heartbeat_interval_ms)
476            .with_heartbeat_timeout_ms(self.heartbeat_timeout_ms);
477        if let Some(rc) = self.reconnect {
478            #[allow(deprecated)]
479            {
480                transport = transport.with_reconnect(rc);
481            }
482        }
483
484        BACnetClient::start(self.config, transport).await
485    }
486}
487
488/// Routing target for confirmed requests.
489enum ConfirmedTarget<'a> {
490    Local {
491        mac: &'a [u8],
492    },
493    Routed {
494        router_mac: &'a [u8],
495        dest_network: u16,
496        dest_mac: &'a [u8],
497    },
498}
499
500impl<'a> ConfirmedTarget<'a> {
501    /// The MAC used for TSM transaction matching.
502    fn tsm_mac(&self) -> &[u8] {
503        match self {
504            Self::Local { mac } => mac,
505            Self::Routed { router_mac, .. } => router_mac,
506        }
507    }
508}
509
510impl<T: TransportPort + 'static> BACnetClient<T> {
511    /// Create a generic builder that accepts a pre-built transport.
512    pub fn generic_builder() -> ClientBuilder<T> {
513        ClientBuilder {
514            config: ClientConfig::default(),
515            transport: None,
516        }
517    }
518
519    /// Start the client: bind transport, start network layer, spawn dispatch.
520    pub async fn start(mut config: ClientConfig, transport: T) -> Result<Self, Error> {
521        let transport_max = transport.max_apdu_length();
522        config.max_apdu_length = config.max_apdu_length.min(transport_max);
523
524        let mut network = NetworkLayer::new(transport);
525        let mut apdu_rx = network.start().await?;
526        let local_mac = MacAddr::from_slice(network.local_mac());
527
528        let network = Arc::new(network);
529
530        let tsm_config = TsmConfig {
531            apdu_timeout_ms: config.apdu_timeout_ms,
532            apdu_segment_timeout_ms: config.apdu_timeout_ms,
533            apdu_retries: config.apdu_retries,
534        };
535        let tsm = Arc::new(Mutex::new(Tsm::new(tsm_config)));
536        let tsm_dispatch = Arc::clone(&tsm);
537        let device_table = Arc::new(Mutex::new(DeviceTable::new()));
538        let device_table_dispatch = Arc::clone(&device_table);
539        let network_dispatch = Arc::clone(&network);
540        let (cov_tx, _) = broadcast::channel::<COVNotificationRequest>(64);
541        let cov_tx_dispatch = cov_tx.clone();
542        let seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>> =
543            Arc::new(Mutex::new(HashMap::new()));
544        let seg_ack_senders_dispatch = Arc::clone(&seg_ack_senders);
545        let segmented_response_accepted = config.segmented_response_accepted;
546
547        let dispatch_task = tokio::spawn(async move {
548            let mut seg_state: HashMap<SegKey, SegmentedReceiveState> = HashMap::new();
549            let mut last_device_purge = Instant::now();
550            const DEVICE_PURGE_INTERVAL: Duration = Duration::from_secs(300);
551            const DEVICE_MAX_AGE: Duration = Duration::from_secs(600);
552
553            while let Some(received) = apdu_rx.recv().await {
554                let now = Instant::now();
555
556                // Periodically purge stale device table entries
557                if now.duration_since(last_device_purge) >= DEVICE_PURGE_INTERVAL {
558                    device_table_dispatch
559                        .lock()
560                        .await
561                        .purge_stale(DEVICE_MAX_AGE);
562                    last_device_purge = now;
563                }
564                // Reap stale segmented sessions and send Abort to the server
565                let stale_keys: Vec<SegKey> = seg_state
566                    .iter()
567                    .filter(|(_, state)| {
568                        now.duration_since(state.last_activity) >= SEG_RECEIVER_TIMEOUT
569                    })
570                    .map(|(key, _)| key.clone())
571                    .collect();
572                for key in &stale_keys {
573                    if let Some(_state) = seg_state.remove(key) {
574                        let abort = Apdu::Abort(AbortPdu {
575                            sent_by_server: false,
576                            invoke_id: key.1,
577                            abort_reason: bacnet_types::enums::AbortReason::TSM_TIMEOUT,
578                        });
579                        let mut buf = BytesMut::with_capacity(4);
580                        encode_apdu(&mut buf, &abort);
581                        let _ = network_dispatch
582                            .send_apdu(&buf, &key.0, false, NetworkPriority::NORMAL)
583                            .await;
584                    }
585                }
586
587                match apdu::decode_apdu(received.apdu.clone()) {
588                    Ok(decoded) => {
589                        Self::dispatch_apdu(
590                            &tsm_dispatch,
591                            &device_table_dispatch,
592                            &network_dispatch,
593                            &cov_tx_dispatch,
594                            &mut seg_state,
595                            &seg_ack_senders_dispatch,
596                            &received.source_mac,
597                            &received.source_network,
598                            decoded,
599                            segmented_response_accepted,
600                        )
601                        .await;
602                    }
603                    Err(e) => {
604                        warn!(error = %e, "Failed to decode received APDU");
605                    }
606                }
607            }
608        });
609
610        Ok(Self {
611            config,
612            network,
613            tsm,
614            device_table,
615            cov_tx,
616            dispatch_task: Some(dispatch_task),
617            seg_ack_senders,
618            local_mac,
619        })
620    }
621
622    /// Dispatch a received APDU to the appropriate handler.
623    #[allow(clippy::too_many_arguments)]
624    async fn dispatch_apdu(
625        tsm: &Arc<Mutex<Tsm>>,
626        device_table: &Arc<Mutex<DeviceTable>>,
627        network: &Arc<NetworkLayer<T>>,
628        cov_tx: &broadcast::Sender<COVNotificationRequest>,
629        seg_state: &mut HashMap<SegKey, SegmentedReceiveState>,
630        seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
631        source_mac: &[u8],
632        source_network: &Option<NpduAddress>,
633        apdu: Apdu,
634        segmented_response_accepted: bool,
635    ) {
636        match apdu {
637            Apdu::SimpleAck(ack) => {
638                debug!(invoke_id = ack.invoke_id, "Received SimpleAck");
639                let mut tsm = tsm.lock().await;
640                tsm.complete_transaction(source_mac, ack.invoke_id, TsmResponse::SimpleAck);
641            }
642            Apdu::ComplexAck(ack) => {
643                if ack.segmented {
644                    Self::handle_segmented_complex_ack(
645                        tsm,
646                        network,
647                        seg_state,
648                        source_mac,
649                        ack,
650                        segmented_response_accepted,
651                    )
652                    .await;
653                } else {
654                    debug!(invoke_id = ack.invoke_id, "Received ComplexAck");
655                    let mut tsm = tsm.lock().await;
656                    tsm.complete_transaction(
657                        source_mac,
658                        ack.invoke_id,
659                        TsmResponse::ComplexAck {
660                            service_data: ack.service_ack,
661                        },
662                    );
663                }
664            }
665            Apdu::Error(err) => {
666                debug!(invoke_id = err.invoke_id, "Received Error PDU");
667                let mut tsm = tsm.lock().await;
668                tsm.complete_transaction(
669                    source_mac,
670                    err.invoke_id,
671                    TsmResponse::Error {
672                        class: err.error_class.to_raw() as u32,
673                        code: err.error_code.to_raw() as u32,
674                    },
675                );
676            }
677            Apdu::Reject(rej) => {
678                debug!(invoke_id = rej.invoke_id, "Received Reject PDU");
679                let mut tsm = tsm.lock().await;
680                tsm.complete_transaction(
681                    source_mac,
682                    rej.invoke_id,
683                    TsmResponse::Reject {
684                        reason: rej.reject_reason.to_raw(),
685                    },
686                );
687            }
688            Apdu::Abort(abt) => {
689                debug!(invoke_id = abt.invoke_id, "Received Abort PDU");
690                let mut tsm = tsm.lock().await;
691                tsm.complete_transaction(
692                    source_mac,
693                    abt.invoke_id,
694                    TsmResponse::Abort {
695                        reason: abt.abort_reason.to_raw(),
696                    },
697                );
698            }
699            Apdu::ConfirmedRequest(req) => {
700                if req.service_choice == ConfirmedServiceChoice::CONFIRMED_COV_NOTIFICATION {
701                    match COVNotificationRequest::decode(&req.service_request) {
702                        Ok(notification) => {
703                            debug!(
704                                object = ?notification.monitored_object_identifier,
705                                "Received ConfirmedCOVNotification"
706                            );
707                            let _ = cov_tx.send(notification);
708
709                            let ack = Apdu::SimpleAck(SimpleAck {
710                                invoke_id: req.invoke_id,
711                                service_choice: req.service_choice,
712                            });
713                            let mut buf = BytesMut::with_capacity(4);
714                            encode_apdu(&mut buf, &ack);
715                            if let Err(e) = network
716                                .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
717                                .await
718                            {
719                                warn!(error = %e, "Failed to send SimpleAck for COV notification");
720                            }
721                        }
722                        Err(e) => {
723                            warn!(error = %e, "Failed to decode ConfirmedCOVNotification");
724                        }
725                    }
726                } else {
727                    debug!(
728                        service = req.service_choice.to_raw(),
729                        "Ignoring ConfirmedRequest (client mode)"
730                    );
731                }
732            }
733            Apdu::UnconfirmedRequest(req) => {
734                if req.service_choice == UnconfirmedServiceChoice::I_AM {
735                    match bacnet_services::who_is::IAmRequest::decode(&req.service_request) {
736                        Ok(i_am) => {
737                            debug!(
738                                device = i_am.object_identifier.instance_number(),
739                                vendor = i_am.vendor_id,
740                                "Received IAm"
741                            );
742                            let (src_net, src_addr) = match source_network {
743                                Some(npdu_addr) if !npdu_addr.mac_address.is_empty() => {
744                                    (Some(npdu_addr.network), Some(npdu_addr.mac_address.clone()))
745                                }
746                                _ => (None, None),
747                            };
748                            let device = DiscoveredDevice {
749                                object_identifier: i_am.object_identifier,
750                                mac_address: MacAddr::from_slice(source_mac),
751                                max_apdu_length: i_am.max_apdu_length,
752                                segmentation_supported: i_am.segmentation_supported,
753                                max_segments_accepted: None,
754                                vendor_id: i_am.vendor_id,
755                                last_seen: std::time::Instant::now(),
756                                source_network: src_net,
757                                source_address: src_addr,
758                            };
759                            device_table.lock().await.upsert(device);
760                        }
761                        Err(e) => {
762                            warn!(error = %e, "Failed to decode IAm");
763                        }
764                    }
765                } else if req.service_choice
766                    == UnconfirmedServiceChoice::UNCONFIRMED_COV_NOTIFICATION
767                {
768                    match COVNotificationRequest::decode(&req.service_request) {
769                        Ok(notification) => {
770                            debug!(
771                                object = ?notification.monitored_object_identifier,
772                                "Received UnconfirmedCOVNotification"
773                            );
774                            let _ = cov_tx.send(notification);
775                        }
776                        Err(e) => {
777                            warn!(error = %e, "Failed to decode UnconfirmedCOVNotification");
778                        }
779                    }
780                } else {
781                    debug!(
782                        service = req.service_choice.to_raw(),
783                        "Ignoring unconfirmed service in client dispatch"
784                    );
785                }
786            }
787            Apdu::SegmentAck(sa) => {
788                let key = (MacAddr::from_slice(source_mac), sa.invoke_id);
789                let senders = seg_ack_senders.lock().await;
790                if let Some(tx) = senders.get(&key) {
791                    let _ = tx.try_send(sa);
792                } else {
793                    debug!(
794                        invoke_id = sa.invoke_id,
795                        "Ignoring SegmentAck for unknown transaction"
796                    );
797                }
798            }
799        }
800    }
801
802    /// Handle a segmented ComplexAck: accumulate segments, send SegmentAcks,
803    /// and reassemble when all segments are received.
804    async fn handle_segmented_complex_ack(
805        tsm: &Arc<Mutex<Tsm>>,
806        network: &Arc<NetworkLayer<T>>,
807        seg_state: &mut HashMap<SegKey, SegmentedReceiveState>,
808        source_mac: &[u8],
809        ack: bacnet_encoding::apdu::ComplexAck,
810        segmented_response_accepted: bool,
811    ) {
812        let seq = ack.sequence_number.unwrap_or(0);
813        let key = (MacAddr::from_slice(source_mac), ack.invoke_id);
814
815        debug!(
816            invoke_id = ack.invoke_id,
817            seq = seq,
818            more = ack.more_follows,
819            "Received segmented ComplexAck"
820        );
821
822        // If client doesn't support segmented reception, send Abort per Clause 5.4.4.2
823        if !segmented_response_accepted {
824            let abort = Apdu::Abort(AbortPdu {
825                sent_by_server: false,
826                invoke_id: ack.invoke_id,
827                abort_reason: bacnet_types::enums::AbortReason::SEGMENTATION_NOT_SUPPORTED,
828            });
829            let mut buf = BytesMut::with_capacity(4);
830            encode_apdu(&mut buf, &abort);
831            let _ = network
832                .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
833                .await;
834            return;
835        }
836
837        const MAX_CONCURRENT_SEG_SESSIONS: usize = 64;
838        if !seg_state.contains_key(&key) && seg_state.len() >= MAX_CONCURRENT_SEG_SESSIONS {
839            warn!(
840                invoke_id = ack.invoke_id,
841                sessions = seg_state.len(),
842                "Max concurrent segmented sessions reached, dropping segment"
843            );
844            return;
845        }
846
847        let proposed_ws = ack.proposed_window_size.unwrap_or(1);
848        let state = seg_state
849            .entry(key.clone())
850            .or_insert_with(|| SegmentedReceiveState {
851                receiver: SegmentReceiver::new(),
852                expected_next_seq: 0,
853                last_activity: Instant::now(),
854                window_position: 0,
855                proposed_window_size: proposed_ws,
856            });
857
858        state.last_activity = Instant::now();
859
860        if seq != state.expected_next_seq {
861            // Check for duplicate (already received) vs true gap
862            if seq < state.expected_next_seq {
863                // Duplicate segment — discard silently and ack
864                debug!(
865                    invoke_id = ack.invoke_id,
866                    seq, "Discarding duplicate segment"
867                );
868            } else {
869                // True gap — send negative SegmentAck with last correctly received seq
870                warn!(
871                    invoke_id = ack.invoke_id,
872                    expected = state.expected_next_seq,
873                    received = seq,
874                    "Segment gap detected, sending negative SegmentAck"
875                );
876            }
877            let neg_ack = Apdu::SegmentAck(SegmentAckPdu {
878                negative_ack: seq >= state.expected_next_seq,
879                sent_by_server: false,
880                invoke_id: ack.invoke_id,
881                // Spec: sequence_number = last correctly received sequence number
882                sequence_number: if state.expected_next_seq > 0 {
883                    state.expected_next_seq.wrapping_sub(1)
884                } else {
885                    0
886                },
887                actual_window_size: proposed_ws,
888            });
889            let mut buf = BytesMut::with_capacity(4);
890            encode_apdu(&mut buf, &neg_ack);
891            if let Err(e) = network
892                .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
893                .await
894            {
895                warn!(error = %e, "Failed to send SegmentAck");
896            }
897            return;
898        }
899
900        if let Err(e) = state.receiver.receive(seq, ack.service_ack) {
901            warn!(error = %e, "Rejecting oversized segment");
902            return;
903        }
904        state.expected_next_seq = seq.wrapping_add(1);
905        state.window_position += 1;
906
907        // Per-window SegmentAck: only ack at window boundary or final segment (Clause 5.2.2)
908        let should_ack = !ack.more_follows || state.window_position >= state.proposed_window_size;
909
910        if should_ack {
911            state.window_position = 0;
912            let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
913                negative_ack: false,
914                sent_by_server: false,
915                invoke_id: ack.invoke_id,
916                sequence_number: seq,
917                actual_window_size: proposed_ws,
918            });
919            let mut buf = BytesMut::with_capacity(4);
920            encode_apdu(&mut buf, &seg_ack);
921            if let Err(e) = network
922                .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
923                .await
924            {
925                warn!(error = %e, "Failed to send SegmentAck");
926            }
927        }
928
929        if !ack.more_follows {
930            let state = seg_state.remove(&key).unwrap();
931            let total = state.receiver.received_count();
932            match state.receiver.reassemble(total) {
933                Ok(service_data) => {
934                    debug!(
935                        invoke_id = ack.invoke_id,
936                        segments = total,
937                        bytes = service_data.len(),
938                        "Reassembled segmented ComplexAck"
939                    );
940                    let mut tsm = tsm.lock().await;
941                    tsm.complete_transaction(
942                        source_mac,
943                        ack.invoke_id,
944                        TsmResponse::ComplexAck {
945                            service_data: Bytes::from(service_data),
946                        },
947                    );
948                }
949                Err(e) => {
950                    warn!(error = %e, "Failed to reassemble segmented ComplexAck");
951                }
952            }
953        }
954    }
955
956    /// Get the client's local MAC address.
957    pub fn local_mac(&self) -> &[u8] {
958        &self.local_mac
959    }
960
961    /// Send a confirmed request and wait for the response.
962    ///
963    /// Returns the service response data (empty for SimpleAck). Automatically
964    /// uses segmented transfer when the payload exceeds the remote device's
965    /// max APDU length.
966    pub async fn confirmed_request(
967        &self,
968        destination_mac: &[u8],
969        service_choice: ConfirmedServiceChoice,
970        service_data: &[u8],
971    ) -> Result<Bytes, Error> {
972        self.confirmed_request_inner(
973            ConfirmedTarget::Local {
974                mac: destination_mac,
975            },
976            service_choice,
977            service_data,
978        )
979        .await
980    }
981
982    /// Send a confirmed request routed through a BACnet router.
983    ///
984    /// The NPDU is sent as a unicast to `router_mac` with DNET/DADR set so
985    /// the router forwards it to `dest_network`/`dest_mac`.
986    pub async fn confirmed_request_routed(
987        &self,
988        router_mac: &[u8],
989        dest_network: u16,
990        dest_mac: &[u8],
991        service_choice: ConfirmedServiceChoice,
992        service_data: &[u8],
993    ) -> Result<Bytes, Error> {
994        self.confirmed_request_inner(
995            ConfirmedTarget::Routed {
996                router_mac,
997                dest_network,
998                dest_mac,
999            },
1000            service_choice,
1001            service_data,
1002        )
1003        .await
1004    }
1005
1006    async fn confirmed_request_inner(
1007        &self,
1008        target: ConfirmedTarget<'_>,
1009        service_choice: ConfirmedServiceChoice,
1010        service_data: &[u8],
1011    ) -> Result<Bytes, Error> {
1012        let tsm_mac = target.tsm_mac();
1013
1014        if let ConfirmedTarget::Local { mac } = &target {
1015            let unsegmented_apdu_size = 4 + service_data.len();
1016            let (remote_max_apdu, remote_max_segments) = {
1017                let dt = self.device_table.lock().await;
1018                let device = dt.get_by_mac(mac);
1019                let max_apdu = device
1020                    .map(|d| d.max_apdu_length as u16)
1021                    .unwrap_or(self.config.max_apdu_length);
1022                let max_seg = device.and_then(|d| d.max_segments_accepted);
1023                (max_apdu, max_seg)
1024            };
1025            if unsegmented_apdu_size > remote_max_apdu as usize {
1026                return self
1027                    .segmented_confirmed_request(
1028                        mac,
1029                        service_choice,
1030                        service_data,
1031                        remote_max_apdu,
1032                        remote_max_segments,
1033                    )
1034                    .await;
1035            }
1036        }
1037
1038        let (invoke_id, rx) = {
1039            let mut tsm = self.tsm.lock().await;
1040            let invoke_id = tsm.allocate_invoke_id(tsm_mac).ok_or_else(|| {
1041                Error::Encoding("all invoke IDs exhausted for destination".into())
1042            })?;
1043            let rx = tsm.register_transaction(MacAddr::from_slice(tsm_mac), invoke_id);
1044            (invoke_id, rx)
1045        };
1046
1047        // Guard cleans up invoke ID if this task is cancelled/aborted
1048        let mut guard = crate::tsm::TsmGuard::new(
1049            std::sync::Arc::clone(&self.tsm),
1050            MacAddr::from_slice(tsm_mac),
1051            invoke_id,
1052        );
1053
1054        let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
1055            segmented: false,
1056            more_follows: false,
1057            segmented_response_accepted: self.config.segmented_response_accepted,
1058            max_segments: self.config.max_segments,
1059            max_apdu_length: self.config.max_apdu_length,
1060            invoke_id,
1061            sequence_number: None,
1062            proposed_window_size: None,
1063            service_choice,
1064            service_request: Bytes::copy_from_slice(service_data),
1065        });
1066
1067        let mut buf = BytesMut::with_capacity(6 + service_data.len());
1068        encode_apdu(&mut buf, &pdu);
1069
1070        let timeout_duration = Duration::from_millis(self.config.apdu_timeout_ms);
1071        let max_retries = self.config.apdu_retries;
1072        let mut attempts: u8 = 0;
1073        let mut rx = rx;
1074
1075        loop {
1076            let send_result = match &target {
1077                ConfirmedTarget::Local { mac } => {
1078                    self.network
1079                        .send_apdu(&buf, mac, true, NetworkPriority::NORMAL)
1080                        .await
1081                }
1082                ConfirmedTarget::Routed {
1083                    router_mac,
1084                    dest_network,
1085                    dest_mac,
1086                } => {
1087                    self.network
1088                        .send_apdu_routed(
1089                            &buf,
1090                            *dest_network,
1091                            dest_mac,
1092                            router_mac,
1093                            true,
1094                            NetworkPriority::NORMAL,
1095                        )
1096                        .await
1097                }
1098            };
1099            if let Err(e) = send_result {
1100                guard.mark_completed();
1101                let mut tsm = self.tsm.lock().await;
1102                tsm.cancel_transaction(tsm_mac, invoke_id);
1103                return Err(e);
1104            }
1105
1106            match timeout(timeout_duration, &mut rx).await {
1107                Ok(Ok(response)) => {
1108                    guard.mark_completed();
1109                    return match response {
1110                        TsmResponse::SimpleAck => Ok(Bytes::new()),
1111                        TsmResponse::ComplexAck { service_data } => Ok(service_data),
1112                        TsmResponse::Error { class, code } => Err(Error::Protocol { class, code }),
1113                        TsmResponse::Reject { reason } => Err(Error::Reject { reason }),
1114                        TsmResponse::Abort { reason } => Err(Error::Abort { reason }),
1115                    };
1116                }
1117                Ok(Err(_)) => {
1118                    guard.mark_completed();
1119                    return Err(Error::Encoding("TSM response channel closed".into()));
1120                }
1121                Err(_timeout) => {
1122                    attempts += 1;
1123                    if attempts > max_retries {
1124                        guard.mark_completed();
1125                        let mut tsm = self.tsm.lock().await;
1126                        tsm.cancel_transaction(tsm_mac, invoke_id);
1127                        return Err(Error::Timeout(timeout_duration));
1128                    }
1129                    debug!(
1130                        invoke_id,
1131                        attempt = attempts,
1132                        max_retries,
1133                        "APDU timeout, retrying confirmed request"
1134                    );
1135                }
1136            }
1137        }
1138    }
1139
1140    /// Send a confirmed request using segmented transfer with windowed flow control.
1141    async fn segmented_confirmed_request(
1142        &self,
1143        destination_mac: &[u8],
1144        service_choice: ConfirmedServiceChoice,
1145        service_data: &[u8],
1146        remote_max_apdu: u16,
1147        remote_max_segments: Option<u32>,
1148    ) -> Result<Bytes, Error> {
1149        let max_seg_size = max_segment_payload(remote_max_apdu, SegmentedPduType::ConfirmedRequest);
1150        let segments = split_payload(service_data, max_seg_size);
1151        let total_segments = segments.len();
1152
1153        if total_segments > 256 {
1154            return Err(Error::Segmentation(format!(
1155                "payload requires {} segments, maximum is 256",
1156                total_segments
1157            )));
1158        }
1159
1160        if let Some(max_seg) = remote_max_segments {
1161            if total_segments > max_seg as usize {
1162                return Err(Error::Segmentation(format!(
1163                    "request requires {} segments but remote accepts at most {}",
1164                    total_segments, max_seg
1165                )));
1166            }
1167        }
1168
1169        debug!(
1170            total_segments,
1171            max_seg_size,
1172            service_data_len = service_data.len(),
1173            "Starting segmented confirmed request"
1174        );
1175
1176        let (invoke_id, rx) = {
1177            let mut tsm = self.tsm.lock().await;
1178            let invoke_id = tsm.allocate_invoke_id(destination_mac).ok_or_else(|| {
1179                Error::Encoding("all invoke IDs exhausted for destination".into())
1180            })?;
1181            let rx = tsm.register_transaction(MacAddr::from_slice(destination_mac), invoke_id);
1182            (invoke_id, rx)
1183        };
1184
1185        let (seg_ack_tx, mut seg_ack_rx) = mpsc::channel(16);
1186        {
1187            let key = (MacAddr::from_slice(destination_mac), invoke_id);
1188            self.seg_ack_senders.lock().await.insert(key, seg_ack_tx);
1189        }
1190
1191        // Tseg: use APDU timeout for now (configurable via apdu_timeout_ms)
1192        let timeout_duration = Duration::from_millis(self.config.apdu_timeout_ms);
1193        let max_ack_retries = self.config.apdu_retries;
1194        let mut window_size = self.config.proposed_window_size.max(1) as usize;
1195        let mut next_seq: usize = 0;
1196        let mut neg_ack_retries: u32 = 0;
1197        const MAX_NEG_ACK_RETRIES: u32 = 10;
1198
1199        let result = async {
1200            while next_seq < total_segments {
1201                let window_end = (next_seq + window_size).min(total_segments);
1202
1203                for (seq, segment_data) in segments[next_seq..window_end]
1204                    .iter()
1205                    .enumerate()
1206                    .map(|(i, s)| (next_seq + i, s))
1207                {
1208                    let is_last = seq == total_segments - 1;
1209                    let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
1210                        segmented: true,
1211                        more_follows: !is_last,
1212                        segmented_response_accepted: self.config.segmented_response_accepted,
1213                        max_segments: self.config.max_segments,
1214                        max_apdu_length: self.config.max_apdu_length,
1215                        invoke_id,
1216                        sequence_number: Some(seq as u8),
1217                        proposed_window_size: Some(self.config.proposed_window_size.max(1)),
1218                        service_choice,
1219                        service_request: segment_data.clone(),
1220                    });
1221
1222                    let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
1223                    encode_apdu(&mut buf, &pdu);
1224
1225                    self.network
1226                        .send_apdu(&buf, destination_mac, true, NetworkPriority::NORMAL)
1227                        .await?;
1228
1229                    debug!(seq, is_last, "Sent segment");
1230                }
1231
1232                let ack = {
1233                    let mut ack_retries: u8 = 0;
1234                    loop {
1235                        match timeout(timeout_duration, seg_ack_rx.recv()).await {
1236                            Ok(Some(ack)) => break ack,
1237                            Ok(None) => {
1238                                return Err(Error::Encoding("SegmentAck channel closed".into()));
1239                            }
1240                            Err(_timeout) => {
1241                                ack_retries += 1;
1242                                if ack_retries > max_ack_retries {
1243                                    return Err(Error::Timeout(timeout_duration));
1244                                }
1245                                warn!(
1246                                    attempt = ack_retries,
1247                                    "Retransmitting segmented request window"
1248                                );
1249                                for (seq, segment_data) in segments[next_seq..window_end]
1250                                    .iter()
1251                                    .enumerate()
1252                                    .map(|(i, s)| (next_seq + i, s))
1253                                {
1254                                    let is_last = seq == total_segments - 1;
1255                                    let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
1256                                        segmented: true,
1257                                        more_follows: !is_last,
1258                                        segmented_response_accepted: self
1259                                            .config
1260                                            .segmented_response_accepted,
1261                                        max_segments: self.config.max_segments,
1262                                        max_apdu_length: self.config.max_apdu_length,
1263                                        invoke_id,
1264                                        sequence_number: Some(seq as u8),
1265                                        proposed_window_size: Some(
1266                                            self.config.proposed_window_size.max(1),
1267                                        ),
1268                                        service_choice,
1269                                        service_request: segment_data.clone(),
1270                                    });
1271
1272                                    let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
1273                                    encode_apdu(&mut buf, &pdu);
1274
1275                                    self.network
1276                                        .send_apdu(
1277                                            &buf,
1278                                            destination_mac,
1279                                            true,
1280                                            NetworkPriority::NORMAL,
1281                                        )
1282                                        .await?;
1283                                }
1284                            }
1285                        }
1286                    }
1287                };
1288
1289                debug!(
1290                    seq = ack.sequence_number,
1291                    negative = ack.negative_ack,
1292                    window = ack.actual_window_size,
1293                    "Received SegmentAck"
1294                );
1295
1296                window_size = ack.actual_window_size.max(1) as usize;
1297
1298                let ack_seq = ack.sequence_number as usize;
1299                if ack_seq >= total_segments {
1300                    return Err(Error::Segmentation(format!(
1301                        "SegmentAck sequence {} out of range (total {})",
1302                        ack_seq, total_segments
1303                    )));
1304                }
1305
1306                if ack.negative_ack {
1307                    neg_ack_retries += 1;
1308                    if neg_ack_retries > MAX_NEG_ACK_RETRIES {
1309                        return Err(Error::Segmentation(
1310                            "too many negative SegmentAck retransmissions".into(),
1311                        ));
1312                    }
1313                    next_seq = ack_seq;
1314                } else {
1315                    neg_ack_retries = 0;
1316                    next_seq = ack_seq + 1;
1317                }
1318            }
1319
1320            timeout(timeout_duration, rx)
1321                .await
1322                .map_err(|_| Error::Timeout(timeout_duration))?
1323                .map_err(|_| Error::Encoding("TSM response channel closed".into()))
1324        }
1325        .await;
1326
1327        {
1328            let key = (MacAddr::from_slice(destination_mac), invoke_id);
1329            self.seg_ack_senders.lock().await.remove(&key);
1330        }
1331
1332        let response = match result {
1333            Ok(response) => response,
1334            Err(e) => {
1335                let mut tsm = self.tsm.lock().await;
1336                tsm.cancel_transaction(destination_mac, invoke_id);
1337                return Err(e);
1338            }
1339        };
1340
1341        match response {
1342            TsmResponse::SimpleAck => Ok(Bytes::new()),
1343            TsmResponse::ComplexAck { service_data } => Ok(service_data),
1344            TsmResponse::Error { class, code } => Err(Error::Protocol { class, code }),
1345            TsmResponse::Reject { reason } => Err(Error::Reject { reason }),
1346            TsmResponse::Abort { reason } => Err(Error::Abort { reason }),
1347        }
1348    }
1349
1350    /// Send an unconfirmed request (fire-and-forget) to a specific destination.
1351    pub async fn unconfirmed_request(
1352        &self,
1353        destination_mac: &[u8],
1354        service_choice: UnconfirmedServiceChoice,
1355        service_data: &[u8],
1356    ) -> Result<(), Error> {
1357        let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1358            service_choice,
1359            service_request: Bytes::copy_from_slice(service_data),
1360        });
1361
1362        let mut buf = BytesMut::with_capacity(2 + service_data.len());
1363        encode_apdu(&mut buf, &pdu);
1364
1365        self.network
1366            .send_apdu(&buf, destination_mac, false, NetworkPriority::NORMAL)
1367            .await
1368    }
1369
1370    /// Broadcast an unconfirmed request on the local network.
1371    pub async fn broadcast_unconfirmed(
1372        &self,
1373        service_choice: UnconfirmedServiceChoice,
1374        service_data: &[u8],
1375    ) -> Result<(), Error> {
1376        let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1377            service_choice,
1378            service_request: Bytes::copy_from_slice(service_data),
1379        });
1380
1381        let mut buf = BytesMut::with_capacity(2 + service_data.len());
1382        encode_apdu(&mut buf, &pdu);
1383
1384        self.network
1385            .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1386            .await
1387    }
1388
1389    /// Broadcast an unconfirmed request globally (DNET=0xFFFF).
1390    pub async fn broadcast_global_unconfirmed(
1391        &self,
1392        service_choice: UnconfirmedServiceChoice,
1393        service_data: &[u8],
1394    ) -> Result<(), Error> {
1395        let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1396            service_choice,
1397            service_request: Bytes::copy_from_slice(service_data),
1398        });
1399
1400        let mut buf = BytesMut::with_capacity(2 + service_data.len());
1401        encode_apdu(&mut buf, &pdu);
1402
1403        self.network
1404            .broadcast_global_apdu(&buf, false, NetworkPriority::NORMAL)
1405            .await
1406    }
1407
1408    /// Broadcast an unconfirmed request to a specific remote network.
1409    pub async fn broadcast_network_unconfirmed(
1410        &self,
1411        service_choice: UnconfirmedServiceChoice,
1412        service_data: &[u8],
1413        dest_network: u16,
1414    ) -> Result<(), Error> {
1415        let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1416            service_choice,
1417            service_request: Bytes::copy_from_slice(service_data),
1418        });
1419
1420        let mut buf = BytesMut::with_capacity(2 + service_data.len());
1421        encode_apdu(&mut buf, &pdu);
1422
1423        self.network
1424            .broadcast_to_network(&buf, dest_network, false, NetworkPriority::NORMAL)
1425            .await
1426    }
1427
1428    /// Read a property from a remote device.
1429    pub async fn read_property(
1430        &self,
1431        destination_mac: &[u8],
1432        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1433        property_identifier: bacnet_types::enums::PropertyIdentifier,
1434        property_array_index: Option<u32>,
1435    ) -> Result<bacnet_services::read_property::ReadPropertyACK, Error> {
1436        use bacnet_services::read_property::ReadPropertyRequest;
1437
1438        let request = ReadPropertyRequest {
1439            object_identifier,
1440            property_identifier,
1441            property_array_index,
1442        };
1443        let mut buf = BytesMut::new();
1444        request.encode(&mut buf);
1445
1446        let response_data = self
1447            .confirmed_request(destination_mac, ConfirmedServiceChoice::READ_PROPERTY, &buf)
1448            .await?;
1449
1450        bacnet_services::read_property::ReadPropertyACK::decode(&response_data)
1451    }
1452
1453    /// Read a property from a discovered device, auto-routing if needed.
1454    pub async fn read_property_from_device(
1455        &self,
1456        device_instance: u32,
1457        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1458        property_identifier: bacnet_types::enums::PropertyIdentifier,
1459        property_array_index: Option<u32>,
1460    ) -> Result<bacnet_services::read_property::ReadPropertyACK, Error> {
1461        let (mac, routing) = {
1462            let dt = self.device_table.lock().await;
1463            let device = dt.get(device_instance).ok_or_else(|| {
1464                Error::Encoding(format!("device {device_instance} not in device table"))
1465            })?;
1466            let routing = match (&device.source_network, &device.source_address) {
1467                (Some(snet), Some(sadr)) => Some((*snet, sadr.to_vec())),
1468                _ => None,
1469            };
1470            (device.mac_address.to_vec(), routing)
1471        };
1472
1473        if let Some((dnet, dadr)) = routing {
1474            self.read_property_routed(
1475                &mac,
1476                dnet,
1477                &dadr,
1478                object_identifier,
1479                property_identifier,
1480                property_array_index,
1481            )
1482            .await
1483        } else {
1484            self.read_property(
1485                &mac,
1486                object_identifier,
1487                property_identifier,
1488                property_array_index,
1489            )
1490            .await
1491        }
1492    }
1493
1494    /// Read a property from a device on a remote BACnet network via a router.
1495    pub async fn read_property_routed(
1496        &self,
1497        router_mac: &[u8],
1498        dest_network: u16,
1499        dest_mac: &[u8],
1500        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1501        property_identifier: bacnet_types::enums::PropertyIdentifier,
1502        property_array_index: Option<u32>,
1503    ) -> Result<bacnet_services::read_property::ReadPropertyACK, Error> {
1504        use bacnet_services::read_property::ReadPropertyRequest;
1505
1506        let request = ReadPropertyRequest {
1507            object_identifier,
1508            property_identifier,
1509            property_array_index,
1510        };
1511        let mut buf = BytesMut::new();
1512        request.encode(&mut buf);
1513
1514        let response_data = self
1515            .confirmed_request_routed(
1516                router_mac,
1517                dest_network,
1518                dest_mac,
1519                ConfirmedServiceChoice::READ_PROPERTY,
1520                &buf,
1521            )
1522            .await?;
1523
1524        bacnet_services::read_property::ReadPropertyACK::decode(&response_data)
1525    }
1526
1527    /// Write a property on a remote device.
1528    pub async fn write_property(
1529        &self,
1530        destination_mac: &[u8],
1531        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1532        property_identifier: bacnet_types::enums::PropertyIdentifier,
1533        property_array_index: Option<u32>,
1534        property_value: Vec<u8>,
1535        priority: Option<u8>,
1536    ) -> Result<(), Error> {
1537        use bacnet_services::write_property::WritePropertyRequest;
1538
1539        let request = WritePropertyRequest {
1540            object_identifier,
1541            property_identifier,
1542            property_array_index,
1543            property_value,
1544            priority,
1545        };
1546        let mut buf = BytesMut::new();
1547        request.encode(&mut buf);
1548
1549        let _ = self
1550            .confirmed_request(
1551                destination_mac,
1552                ConfirmedServiceChoice::WRITE_PROPERTY,
1553                &buf,
1554            )
1555            .await?;
1556
1557        Ok(())
1558    }
1559
1560    /// Read multiple properties from one or more objects on a remote device.
1561    pub async fn read_property_multiple(
1562        &self,
1563        destination_mac: &[u8],
1564        specs: Vec<bacnet_services::rpm::ReadAccessSpecification>,
1565    ) -> Result<bacnet_services::rpm::ReadPropertyMultipleACK, Error> {
1566        use bacnet_services::rpm::{ReadPropertyMultipleACK, ReadPropertyMultipleRequest};
1567
1568        let request = ReadPropertyMultipleRequest {
1569            list_of_read_access_specs: specs,
1570        };
1571        let mut buf = BytesMut::new();
1572        request.encode(&mut buf);
1573
1574        let response_data = self
1575            .confirmed_request(
1576                destination_mac,
1577                ConfirmedServiceChoice::READ_PROPERTY_MULTIPLE,
1578                &buf,
1579            )
1580            .await?;
1581
1582        ReadPropertyMultipleACK::decode(&response_data)
1583    }
1584
1585    /// Write multiple properties on one or more objects on a remote device.
1586    pub async fn write_property_multiple(
1587        &self,
1588        destination_mac: &[u8],
1589        specs: Vec<bacnet_services::wpm::WriteAccessSpecification>,
1590    ) -> Result<(), Error> {
1591        use bacnet_services::wpm::WritePropertyMultipleRequest;
1592
1593        let request = WritePropertyMultipleRequest {
1594            list_of_write_access_specs: specs,
1595        };
1596        let mut buf = BytesMut::new();
1597        request.encode(&mut buf);
1598
1599        let _ = self
1600            .confirmed_request(
1601                destination_mac,
1602                ConfirmedServiceChoice::WRITE_PROPERTY_MULTIPLE,
1603                &buf,
1604            )
1605            .await?;
1606
1607        Ok(())
1608    }
1609
1610    // -----------------------------------------------------------------------
1611    // Auto-routing _from_device variants (RPM, WP, WPM)
1612    // -----------------------------------------------------------------------
1613
1614    /// Read multiple properties from a discovered device, auto-routing if needed.
1615    pub async fn read_property_multiple_from_device(
1616        &self,
1617        device_instance: u32,
1618        specs: Vec<bacnet_services::rpm::ReadAccessSpecification>,
1619    ) -> Result<bacnet_services::rpm::ReadPropertyMultipleACK, Error> {
1620        let (mac, routing) = self.resolve_device(device_instance).await?;
1621
1622        if let Some((dnet, dadr)) = routing {
1623            use bacnet_services::rpm::{ReadPropertyMultipleACK, ReadPropertyMultipleRequest};
1624
1625            let request = ReadPropertyMultipleRequest {
1626                list_of_read_access_specs: specs,
1627            };
1628            let mut buf = BytesMut::new();
1629            request.encode(&mut buf);
1630
1631            let response_data = self
1632                .confirmed_request_routed(
1633                    &mac,
1634                    dnet,
1635                    &dadr,
1636                    ConfirmedServiceChoice::READ_PROPERTY_MULTIPLE,
1637                    &buf,
1638                )
1639                .await?;
1640
1641            ReadPropertyMultipleACK::decode(&response_data)
1642        } else {
1643            self.read_property_multiple(&mac, specs).await
1644        }
1645    }
1646
1647    /// Write a property on a discovered device, auto-routing if needed.
1648    pub async fn write_property_to_device(
1649        &self,
1650        device_instance: u32,
1651        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1652        property_identifier: bacnet_types::enums::PropertyIdentifier,
1653        property_array_index: Option<u32>,
1654        property_value: Vec<u8>,
1655        priority: Option<u8>,
1656    ) -> Result<(), Error> {
1657        let (mac, routing) = self.resolve_device(device_instance).await?;
1658
1659        if let Some((dnet, dadr)) = routing {
1660            use bacnet_services::write_property::WritePropertyRequest;
1661
1662            let request = WritePropertyRequest {
1663                object_identifier,
1664                property_identifier,
1665                property_array_index,
1666                property_value,
1667                priority,
1668            };
1669            let mut buf = BytesMut::new();
1670            request.encode(&mut buf);
1671
1672            let _ = self
1673                .confirmed_request_routed(
1674                    &mac,
1675                    dnet,
1676                    &dadr,
1677                    ConfirmedServiceChoice::WRITE_PROPERTY,
1678                    &buf,
1679                )
1680                .await?;
1681            Ok(())
1682        } else {
1683            self.write_property(
1684                &mac,
1685                object_identifier,
1686                property_identifier,
1687                property_array_index,
1688                property_value,
1689                priority,
1690            )
1691            .await
1692        }
1693    }
1694
1695    /// Write multiple properties on a discovered device, auto-routing if needed.
1696    pub async fn write_property_multiple_to_device(
1697        &self,
1698        device_instance: u32,
1699        specs: Vec<bacnet_services::wpm::WriteAccessSpecification>,
1700    ) -> Result<(), Error> {
1701        let (mac, routing) = self.resolve_device(device_instance).await?;
1702
1703        if let Some((dnet, dadr)) = routing {
1704            use bacnet_services::wpm::WritePropertyMultipleRequest;
1705
1706            let request = WritePropertyMultipleRequest {
1707                list_of_write_access_specs: specs,
1708            };
1709            let mut buf = BytesMut::new();
1710            request.encode(&mut buf);
1711
1712            let _ = self
1713                .confirmed_request_routed(
1714                    &mac,
1715                    dnet,
1716                    &dadr,
1717                    ConfirmedServiceChoice::WRITE_PROPERTY_MULTIPLE,
1718                    &buf,
1719                )
1720                .await?;
1721            Ok(())
1722        } else {
1723            self.write_property_multiple(&mac, specs).await
1724        }
1725    }
1726
1727    /// Resolve a device instance to its MAC address and optional routing info.
1728    async fn resolve_device(
1729        &self,
1730        device_instance: u32,
1731    ) -> Result<(Vec<u8>, Option<(u16, Vec<u8>)>), Error> {
1732        let dt = self.device_table.lock().await;
1733        let device = dt.get(device_instance).ok_or_else(|| {
1734            Error::Encoding(format!("device {device_instance} not in device table"))
1735        })?;
1736        let routing = match (&device.source_network, &device.source_address) {
1737            (Some(snet), Some(sadr)) => Some((*snet, sadr.to_vec())),
1738            _ => None,
1739        };
1740        Ok((device.mac_address.to_vec(), routing))
1741    }
1742
1743    // -----------------------------------------------------------------------
1744    // Multi-device batch operations
1745    // -----------------------------------------------------------------------
1746
1747    /// Read a property from multiple discovered devices concurrently.
1748    ///
1749    /// All requests are dispatched concurrently (up to `max_concurrent`,
1750    /// default 32) and results are returned in completion order. Each device
1751    /// is resolved from the device table and auto-routed if behind a router.
1752    pub async fn read_property_from_devices(
1753        &self,
1754        requests: Vec<DeviceReadRequest>,
1755        max_concurrent: Option<usize>,
1756    ) -> Vec<DeviceReadResult> {
1757        use futures_util::stream::{self, StreamExt};
1758
1759        let concurrency = max_concurrent.unwrap_or(DEFAULT_BATCH_CONCURRENCY);
1760
1761        stream::iter(requests)
1762            .map(|req| async move {
1763                let result = self
1764                    .read_property_from_device(
1765                        req.device_instance,
1766                        req.object_identifier,
1767                        req.property_identifier,
1768                        req.property_array_index,
1769                    )
1770                    .await;
1771                DeviceReadResult {
1772                    device_instance: req.device_instance,
1773                    result,
1774                }
1775            })
1776            .buffer_unordered(concurrency)
1777            .collect()
1778            .await
1779    }
1780
1781    /// Read multiple properties from multiple devices concurrently (RPM batch).
1782    ///
1783    /// Sends an RPM to each device concurrently. This is the most efficient
1784    /// way to poll many properties across many devices — RPM batches within
1785    /// a single device, and this method batches across devices.
1786    pub async fn read_property_multiple_from_devices(
1787        &self,
1788        requests: Vec<DeviceRpmRequest>,
1789        max_concurrent: Option<usize>,
1790    ) -> Vec<DeviceRpmResult> {
1791        use futures_util::stream::{self, StreamExt};
1792
1793        let concurrency = max_concurrent.unwrap_or(DEFAULT_BATCH_CONCURRENCY);
1794
1795        stream::iter(requests)
1796            .map(|req| async move {
1797                let result = self
1798                    .read_property_multiple_from_device(req.device_instance, req.specs)
1799                    .await;
1800                DeviceRpmResult {
1801                    device_instance: req.device_instance,
1802                    result,
1803                }
1804            })
1805            .buffer_unordered(concurrency)
1806            .collect()
1807            .await
1808    }
1809
1810    /// Write a property on multiple devices concurrently.
1811    ///
1812    /// All writes are dispatched concurrently (up to `max_concurrent`,
1813    /// default 32). Results are returned in completion order.
1814    pub async fn write_property_to_devices(
1815        &self,
1816        requests: Vec<DeviceWriteRequest>,
1817        max_concurrent: Option<usize>,
1818    ) -> Vec<DeviceWriteResult> {
1819        use futures_util::stream::{self, StreamExt};
1820
1821        let concurrency = max_concurrent.unwrap_or(DEFAULT_BATCH_CONCURRENCY);
1822
1823        stream::iter(requests)
1824            .map(|req| async move {
1825                let result = self
1826                    .write_property_to_device(
1827                        req.device_instance,
1828                        req.object_identifier,
1829                        req.property_identifier,
1830                        req.property_array_index,
1831                        req.property_value,
1832                        req.priority,
1833                    )
1834                    .await;
1835                DeviceWriteResult {
1836                    device_instance: req.device_instance,
1837                    result,
1838                }
1839            })
1840            .buffer_unordered(concurrency)
1841            .collect()
1842            .await
1843    }
1844
1845    /// Send a WhoIs broadcast to discover devices.
1846    pub async fn who_is(
1847        &self,
1848        low_limit: Option<u32>,
1849        high_limit: Option<u32>,
1850    ) -> Result<(), Error> {
1851        use bacnet_services::who_is::WhoIsRequest;
1852
1853        let request = WhoIsRequest {
1854            low_limit,
1855            high_limit,
1856        };
1857        let mut buf = BytesMut::new();
1858        request.encode(&mut buf);
1859
1860        self.broadcast_global_unconfirmed(UnconfirmedServiceChoice::WHO_IS, &buf)
1861            .await
1862    }
1863
1864    /// Send a directed (unicast) WhoIs to a specific device.
1865    pub async fn who_is_directed(
1866        &self,
1867        destination_mac: &[u8],
1868        low_limit: Option<u32>,
1869        high_limit: Option<u32>,
1870    ) -> Result<(), Error> {
1871        use bacnet_services::who_is::WhoIsRequest;
1872
1873        let request = WhoIsRequest {
1874            low_limit,
1875            high_limit,
1876        };
1877        let mut buf = BytesMut::new();
1878        request.encode(&mut buf);
1879
1880        self.unconfirmed_request(destination_mac, UnconfirmedServiceChoice::WHO_IS, &buf)
1881            .await
1882    }
1883
1884    /// Send a WhoIs broadcast to a specific remote network.
1885    pub async fn who_is_network(
1886        &self,
1887        dest_network: u16,
1888        low_limit: Option<u32>,
1889        high_limit: Option<u32>,
1890    ) -> Result<(), Error> {
1891        use bacnet_services::who_is::WhoIsRequest;
1892
1893        let request = WhoIsRequest {
1894            low_limit,
1895            high_limit,
1896        };
1897        let mut buf = BytesMut::new();
1898        request.encode(&mut buf);
1899
1900        self.broadcast_network_unconfirmed(UnconfirmedServiceChoice::WHO_IS, &buf, dest_network)
1901            .await
1902    }
1903
1904    /// Send a WhoHas broadcast to find an object by identifier or name.
1905    pub async fn who_has(
1906        &self,
1907        object: bacnet_services::who_has::WhoHasObject,
1908        low_limit: Option<u32>,
1909        high_limit: Option<u32>,
1910    ) -> Result<(), Error> {
1911        use bacnet_services::who_has::WhoHasRequest;
1912
1913        let request = WhoHasRequest {
1914            low_limit,
1915            high_limit,
1916            object,
1917        };
1918        let mut buf = BytesMut::new();
1919        request.encode(&mut buf)?;
1920
1921        self.broadcast_unconfirmed(UnconfirmedServiceChoice::WHO_HAS, &buf)
1922            .await
1923    }
1924
1925    /// Subscribe to COV notifications for an object on a remote device.
1926    pub async fn subscribe_cov(
1927        &self,
1928        destination_mac: &[u8],
1929        subscriber_process_identifier: u32,
1930        monitored_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1931        confirmed: bool,
1932        lifetime: Option<u32>,
1933    ) -> Result<(), Error> {
1934        use bacnet_services::cov::SubscribeCOVRequest;
1935
1936        let request = SubscribeCOVRequest {
1937            subscriber_process_identifier,
1938            monitored_object_identifier,
1939            issue_confirmed_notifications: Some(confirmed),
1940            lifetime,
1941        };
1942        let mut buf = BytesMut::new();
1943        request.encode(&mut buf);
1944
1945        let _ = self
1946            .confirmed_request(destination_mac, ConfirmedServiceChoice::SUBSCRIBE_COV, &buf)
1947            .await?;
1948
1949        Ok(())
1950    }
1951
1952    /// Cancel a COV subscription on a remote device.
1953    pub async fn unsubscribe_cov(
1954        &self,
1955        destination_mac: &[u8],
1956        subscriber_process_identifier: u32,
1957        monitored_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1958    ) -> Result<(), Error> {
1959        use bacnet_services::cov::SubscribeCOVRequest;
1960
1961        let request = SubscribeCOVRequest {
1962            subscriber_process_identifier,
1963            monitored_object_identifier,
1964            issue_confirmed_notifications: None,
1965            lifetime: None,
1966        };
1967        let mut buf = BytesMut::new();
1968        request.encode(&mut buf);
1969
1970        let _ = self
1971            .confirmed_request(destination_mac, ConfirmedServiceChoice::SUBSCRIBE_COV, &buf)
1972            .await?;
1973
1974        Ok(())
1975    }
1976
1977    /// Delete an object on a remote device.
1978    pub async fn delete_object(
1979        &self,
1980        destination_mac: &[u8],
1981        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1982    ) -> Result<(), Error> {
1983        use bacnet_services::object_mgmt::DeleteObjectRequest;
1984
1985        let request = DeleteObjectRequest { object_identifier };
1986        let mut buf = BytesMut::new();
1987        request.encode(&mut buf);
1988
1989        let _ = self
1990            .confirmed_request(destination_mac, ConfirmedServiceChoice::DELETE_OBJECT, &buf)
1991            .await?;
1992
1993        Ok(())
1994    }
1995
1996    /// Create an object on a remote device.
1997    pub async fn create_object(
1998        &self,
1999        destination_mac: &[u8],
2000        object_specifier: bacnet_services::object_mgmt::ObjectSpecifier,
2001        initial_values: Vec<bacnet_services::common::BACnetPropertyValue>,
2002    ) -> Result<Bytes, Error> {
2003        use bacnet_services::object_mgmt::CreateObjectRequest;
2004
2005        let request = CreateObjectRequest {
2006            object_specifier,
2007            list_of_initial_values: initial_values,
2008        };
2009        let mut buf = BytesMut::new();
2010        request.encode(&mut buf);
2011
2012        self.confirmed_request(destination_mac, ConfirmedServiceChoice::CREATE_OBJECT, &buf)
2013            .await
2014    }
2015
2016    /// Send DeviceCommunicationControl to a remote device.
2017    pub async fn device_communication_control(
2018        &self,
2019        destination_mac: &[u8],
2020        enable_disable: bacnet_types::enums::EnableDisable,
2021        time_duration: Option<u16>,
2022        password: Option<String>,
2023    ) -> Result<(), Error> {
2024        use bacnet_services::device_mgmt::DeviceCommunicationControlRequest;
2025
2026        let request = DeviceCommunicationControlRequest {
2027            time_duration,
2028            enable_disable,
2029            password,
2030        };
2031        let mut buf = BytesMut::new();
2032        request.encode(&mut buf)?;
2033
2034        let _ = self
2035            .confirmed_request(
2036                destination_mac,
2037                ConfirmedServiceChoice::DEVICE_COMMUNICATION_CONTROL,
2038                &buf,
2039            )
2040            .await?;
2041
2042        Ok(())
2043    }
2044
2045    /// Send ReinitializeDevice to a remote device.
2046    pub async fn reinitialize_device(
2047        &self,
2048        destination_mac: &[u8],
2049        reinitialized_state: bacnet_types::enums::ReinitializedState,
2050        password: Option<String>,
2051    ) -> Result<(), Error> {
2052        use bacnet_services::device_mgmt::ReinitializeDeviceRequest;
2053
2054        let request = ReinitializeDeviceRequest {
2055            reinitialized_state,
2056            password,
2057        };
2058        let mut buf = BytesMut::new();
2059        request.encode(&mut buf)?;
2060
2061        let _ = self
2062            .confirmed_request(
2063                destination_mac,
2064                ConfirmedServiceChoice::REINITIALIZE_DEVICE,
2065                &buf,
2066            )
2067            .await?;
2068
2069        Ok(())
2070    }
2071
2072    /// Get event information from a remote device.
2073    pub async fn get_event_information(
2074        &self,
2075        destination_mac: &[u8],
2076        last_received_object_identifier: Option<bacnet_types::primitives::ObjectIdentifier>,
2077    ) -> Result<Bytes, Error> {
2078        use bacnet_services::alarm_event::GetEventInformationRequest;
2079
2080        let request = GetEventInformationRequest {
2081            last_received_object_identifier,
2082        };
2083        let mut buf = BytesMut::new();
2084        request.encode(&mut buf);
2085
2086        self.confirmed_request(
2087            destination_mac,
2088            ConfirmedServiceChoice::GET_EVENT_INFORMATION,
2089            &buf,
2090        )
2091        .await
2092    }
2093
2094    /// Acknowledge an alarm on a remote device.
2095    pub async fn acknowledge_alarm(
2096        &self,
2097        destination_mac: &[u8],
2098        acknowledging_process_identifier: u32,
2099        event_object_identifier: bacnet_types::primitives::ObjectIdentifier,
2100        event_state_acknowledged: u32,
2101        acknowledgment_source: &str,
2102    ) -> Result<(), Error> {
2103        use bacnet_services::alarm_event::AcknowledgeAlarmRequest;
2104
2105        let request = AcknowledgeAlarmRequest {
2106            acknowledging_process_identifier,
2107            event_object_identifier,
2108            event_state_acknowledged,
2109            timestamp: bacnet_types::primitives::BACnetTimeStamp::SequenceNumber(0),
2110            acknowledgment_source: acknowledgment_source.to_string(),
2111            time_of_acknowledgment: bacnet_types::primitives::BACnetTimeStamp::SequenceNumber(0),
2112        };
2113        let mut buf = BytesMut::new();
2114        request.encode(&mut buf)?;
2115
2116        let _ = self
2117            .confirmed_request(
2118                destination_mac,
2119                ConfirmedServiceChoice::ACKNOWLEDGE_ALARM,
2120                &buf,
2121            )
2122            .await?;
2123
2124        Ok(())
2125    }
2126
2127    /// Read a range of items from a list or log-buffer property.
2128    pub async fn read_range(
2129        &self,
2130        destination_mac: &[u8],
2131        object_identifier: bacnet_types::primitives::ObjectIdentifier,
2132        property_identifier: bacnet_types::enums::PropertyIdentifier,
2133        property_array_index: Option<u32>,
2134        range: Option<bacnet_services::read_range::RangeSpec>,
2135    ) -> Result<bacnet_services::read_range::ReadRangeAck, Error> {
2136        use bacnet_services::read_range::{ReadRangeAck, ReadRangeRequest};
2137
2138        let request = ReadRangeRequest {
2139            object_identifier,
2140            property_identifier,
2141            property_array_index,
2142            range,
2143        };
2144        let mut buf = BytesMut::new();
2145        request.encode(&mut buf);
2146
2147        let response_data = self
2148            .confirmed_request(destination_mac, ConfirmedServiceChoice::READ_RANGE, &buf)
2149            .await?;
2150
2151        ReadRangeAck::decode(&response_data)
2152    }
2153
2154    /// Read file data from a remote device (stream or record access).
2155    pub async fn atomic_read_file(
2156        &self,
2157        destination_mac: &[u8],
2158        file_identifier: bacnet_types::primitives::ObjectIdentifier,
2159        access: bacnet_services::file::FileAccessMethod,
2160    ) -> Result<Bytes, Error> {
2161        use bacnet_services::file::AtomicReadFileRequest;
2162
2163        let request = AtomicReadFileRequest {
2164            file_identifier,
2165            access,
2166        };
2167        let mut buf = BytesMut::new();
2168        request.encode(&mut buf);
2169
2170        self.confirmed_request(
2171            destination_mac,
2172            ConfirmedServiceChoice::ATOMIC_READ_FILE,
2173            &buf,
2174        )
2175        .await
2176    }
2177
2178    /// Write file data to a remote device (stream or record access).
2179    pub async fn atomic_write_file(
2180        &self,
2181        destination_mac: &[u8],
2182        file_identifier: bacnet_types::primitives::ObjectIdentifier,
2183        access: bacnet_services::file::FileWriteAccessMethod,
2184    ) -> Result<Bytes, Error> {
2185        use bacnet_services::file::AtomicWriteFileRequest;
2186
2187        let request = AtomicWriteFileRequest {
2188            file_identifier,
2189            access,
2190        };
2191        let mut buf = BytesMut::new();
2192        request.encode(&mut buf);
2193
2194        self.confirmed_request(
2195            destination_mac,
2196            ConfirmedServiceChoice::ATOMIC_WRITE_FILE,
2197            &buf,
2198        )
2199        .await
2200    }
2201
2202    /// Add elements to a list property on a remote device.
2203    pub async fn add_list_element(
2204        &self,
2205        destination_mac: &[u8],
2206        object_identifier: bacnet_types::primitives::ObjectIdentifier,
2207        property_identifier: bacnet_types::enums::PropertyIdentifier,
2208        property_array_index: Option<u32>,
2209        list_of_elements: Vec<u8>,
2210    ) -> Result<(), Error> {
2211        use bacnet_services::list_manipulation::ListElementRequest;
2212
2213        let request = ListElementRequest {
2214            object_identifier,
2215            property_identifier,
2216            property_array_index,
2217            list_of_elements,
2218        };
2219        let mut buf = BytesMut::new();
2220        request.encode(&mut buf);
2221
2222        let _ = self
2223            .confirmed_request(
2224                destination_mac,
2225                ConfirmedServiceChoice::ADD_LIST_ELEMENT,
2226                &buf,
2227            )
2228            .await?;
2229
2230        Ok(())
2231    }
2232
2233    /// Remove elements from a list property on a remote device.
2234    pub async fn remove_list_element(
2235        &self,
2236        destination_mac: &[u8],
2237        object_identifier: bacnet_types::primitives::ObjectIdentifier,
2238        property_identifier: bacnet_types::enums::PropertyIdentifier,
2239        property_array_index: Option<u32>,
2240        list_of_elements: Vec<u8>,
2241    ) -> Result<(), Error> {
2242        use bacnet_services::list_manipulation::ListElementRequest;
2243
2244        let request = ListElementRequest {
2245            object_identifier,
2246            property_identifier,
2247            property_array_index,
2248            list_of_elements,
2249        };
2250        let mut buf = BytesMut::new();
2251        request.encode(&mut buf);
2252
2253        let _ = self
2254            .confirmed_request(
2255                destination_mac,
2256                ConfirmedServiceChoice::REMOVE_LIST_ELEMENT,
2257                &buf,
2258            )
2259            .await?;
2260
2261        Ok(())
2262    }
2263
2264    /// Send a TimeSynchronization request (unconfirmed, no response expected).
2265    pub async fn time_synchronization(
2266        &self,
2267        destination_mac: &[u8],
2268        date: bacnet_types::primitives::Date,
2269        time: bacnet_types::primitives::Time,
2270    ) -> Result<(), Error> {
2271        use bacnet_services::device_mgmt::TimeSynchronizationRequest;
2272
2273        let request = TimeSynchronizationRequest { date, time };
2274        let mut buf = BytesMut::new();
2275        request.encode(&mut buf);
2276
2277        self.unconfirmed_request(
2278            destination_mac,
2279            UnconfirmedServiceChoice::TIME_SYNCHRONIZATION,
2280            &buf,
2281        )
2282        .await
2283    }
2284
2285    /// Send a UTCTimeSynchronization request (unconfirmed, no response expected).
2286    pub async fn utc_time_synchronization(
2287        &self,
2288        destination_mac: &[u8],
2289        date: bacnet_types::primitives::Date,
2290        time: bacnet_types::primitives::Time,
2291    ) -> Result<(), Error> {
2292        use bacnet_services::device_mgmt::TimeSynchronizationRequest;
2293
2294        let request = TimeSynchronizationRequest { date, time };
2295        let mut buf = BytesMut::new();
2296        request.encode(&mut buf);
2297
2298        self.unconfirmed_request(
2299            destination_mac,
2300            UnconfirmedServiceChoice::UTC_TIME_SYNCHRONIZATION,
2301            &buf,
2302        )
2303        .await
2304    }
2305
2306    /// Get a receiver for incoming COV notifications. Each call returns a new
2307    /// independent receiver.
2308    pub fn cov_notifications(&self) -> broadcast::Receiver<COVNotificationRequest> {
2309        self.cov_tx.subscribe()
2310    }
2311
2312    /// Get a snapshot of all discovered devices.
2313    pub async fn discovered_devices(&self) -> Vec<DiscoveredDevice> {
2314        self.device_table.lock().await.all()
2315    }
2316
2317    /// Look up a discovered device by instance number.
2318    pub async fn get_device(&self, instance: u32) -> Option<DiscoveredDevice> {
2319        self.device_table.lock().await.get(instance).cloned()
2320    }
2321
2322    /// Clear the discovered devices table.
2323    pub async fn clear_devices(&self) {
2324        self.device_table.lock().await.clear();
2325    }
2326
2327    /// Manually register a device in the device table.
2328    ///
2329    /// Useful for adding known devices without requiring WhoIs/IAm exchange.
2330    /// Sets default values for max_apdu_length (1476), segmentation (NONE),
2331    /// and vendor_id (0) since these are unknown without IAm.
2332    pub async fn add_device(&self, instance: u32, mac: &[u8]) -> Result<(), Error> {
2333        let oid = bacnet_types::primitives::ObjectIdentifier::new(
2334            bacnet_types::enums::ObjectType::DEVICE,
2335            instance,
2336        )?;
2337        let device = DiscoveredDevice {
2338            object_identifier: oid,
2339            mac_address: MacAddr::from_slice(mac),
2340            max_apdu_length: 1476,
2341            segmentation_supported: bacnet_types::enums::Segmentation::NONE,
2342            max_segments_accepted: None,
2343            vendor_id: 0,
2344            last_seen: std::time::Instant::now(),
2345            source_network: None,
2346            source_address: None,
2347        };
2348        self.device_table.lock().await.upsert(device);
2349        Ok(())
2350    }
2351
2352    /// Stop the client, aborting the dispatch task.
2353    pub async fn stop(&mut self) -> Result<(), Error> {
2354        if let Some(task) = self.dispatch_task.take() {
2355            task.abort();
2356            let _ = task.await;
2357        }
2358        Ok(())
2359    }
2360}
2361
2362#[cfg(test)]
2363mod tests {
2364    use super::*;
2365    use bacnet_encoding::apdu::{ComplexAck, SimpleAck};
2366    use std::net::Ipv4Addr;
2367    use tokio::time::Duration;
2368
2369    async fn make_client() -> BACnetClient<BipTransport> {
2370        BACnetClient::builder()
2371            .interface(Ipv4Addr::LOCALHOST)
2372            .port(0)
2373            .apdu_timeout_ms(2000)
2374            .build()
2375            .await
2376            .unwrap()
2377    }
2378
2379    #[tokio::test]
2380    async fn client_start_stop() {
2381        let mut client = make_client().await;
2382        assert!(!client.local_mac().is_empty());
2383        client.stop().await.unwrap();
2384    }
2385
2386    #[tokio::test]
2387    async fn confirmed_request_simple_ack() {
2388        let mut client_a = make_client().await;
2389
2390        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2391        let mut net_b = NetworkLayer::new(transport_b);
2392        let mut rx_b = net_b.start().await.unwrap();
2393        let b_mac = net_b.local_mac().to_vec();
2394
2395        let b_handle = tokio::spawn(async move {
2396            let received = timeout(Duration::from_secs(2), rx_b.recv())
2397                .await
2398                .expect("B timed out")
2399                .expect("B channel closed");
2400
2401            let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2402            if let Apdu::ConfirmedRequest(req) = decoded {
2403                let ack = Apdu::SimpleAck(SimpleAck {
2404                    invoke_id: req.invoke_id,
2405                    service_choice: req.service_choice,
2406                });
2407                let mut buf = BytesMut::new();
2408                encode_apdu(&mut buf, &ack);
2409                net_b
2410                    .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2411                    .await
2412                    .unwrap();
2413            }
2414            net_b.stop().await.unwrap();
2415        });
2416
2417        let result = client_a
2418            .confirmed_request(
2419                &b_mac,
2420                ConfirmedServiceChoice::WRITE_PROPERTY,
2421                &[0x01, 0x02],
2422            )
2423            .await;
2424
2425        assert!(result.is_ok());
2426        let response = result.unwrap();
2427        assert!(response.is_empty());
2428
2429        b_handle.await.unwrap();
2430        client_a.stop().await.unwrap();
2431    }
2432
2433    #[tokio::test]
2434    async fn confirmed_request_complex_ack() {
2435        let mut client_a = make_client().await;
2436
2437        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2438        let mut net_b = NetworkLayer::new(transport_b);
2439        let mut rx_b = net_b.start().await.unwrap();
2440        let b_mac = net_b.local_mac().to_vec();
2441
2442        let b_handle = tokio::spawn(async move {
2443            let received = timeout(Duration::from_secs(2), rx_b.recv())
2444                .await
2445                .unwrap()
2446                .unwrap();
2447
2448            let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2449            if let Apdu::ConfirmedRequest(req) = decoded {
2450                let ack = Apdu::ComplexAck(ComplexAck {
2451                    segmented: false,
2452                    more_follows: false,
2453                    invoke_id: req.invoke_id,
2454                    sequence_number: None,
2455                    proposed_window_size: None,
2456                    service_choice: req.service_choice,
2457                    service_ack: Bytes::from_static(&[0xDE, 0xAD, 0xBE, 0xEF]),
2458                });
2459                let mut buf = BytesMut::new();
2460                encode_apdu(&mut buf, &ack);
2461                net_b
2462                    .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2463                    .await
2464                    .unwrap();
2465            }
2466            net_b.stop().await.unwrap();
2467        });
2468
2469        let result = client_a
2470            .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
2471            .await;
2472
2473        assert!(result.is_ok());
2474        assert_eq!(result.unwrap(), vec![0xDE, 0xAD, 0xBE, 0xEF]);
2475
2476        b_handle.await.unwrap();
2477        client_a.stop().await.unwrap();
2478    }
2479
2480    #[tokio::test]
2481    async fn confirmed_request_timeout() {
2482        let mut client = make_client().await;
2483        let fake_mac = vec![10, 99, 99, 99, 0xBA, 0xC0];
2484        let result = client
2485            .confirmed_request(&fake_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
2486            .await;
2487        assert!(result.is_err());
2488        client.stop().await.unwrap();
2489    }
2490
2491    #[tokio::test]
2492    async fn segmented_complex_ack_reassembly() {
2493        let mut client = make_client().await;
2494
2495        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2496        let mut net_b = NetworkLayer::new(transport_b);
2497        let mut rx_b = net_b.start().await.unwrap();
2498        let b_mac = net_b.local_mac().to_vec();
2499
2500        let b_handle = tokio::spawn(async move {
2501            let received = timeout(Duration::from_secs(2), rx_b.recv())
2502                .await
2503                .unwrap()
2504                .unwrap();
2505
2506            let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2507            let invoke_id = if let Apdu::ConfirmedRequest(req) = decoded {
2508                req.invoke_id
2509            } else {
2510                panic!("Expected ConfirmedRequest");
2511            };
2512
2513            let service_choice = ConfirmedServiceChoice::READ_PROPERTY;
2514            let segments: Vec<Bytes> = vec![
2515                Bytes::from_static(&[0x01, 0x02, 0x03]),
2516                Bytes::from_static(&[0x04, 0x05, 0x06]),
2517                Bytes::from_static(&[0x07, 0x08]),
2518            ];
2519
2520            for (i, seg) in segments.iter().enumerate() {
2521                let is_last = i == segments.len() - 1;
2522                let ack = Apdu::ComplexAck(ComplexAck {
2523                    segmented: true,
2524                    more_follows: !is_last,
2525                    invoke_id,
2526                    sequence_number: Some(i as u8),
2527                    proposed_window_size: Some(1),
2528                    service_choice,
2529                    service_ack: seg.clone(),
2530                });
2531                let mut buf = BytesMut::new();
2532                encode_apdu(&mut buf, &ack);
2533                net_b
2534                    .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2535                    .await
2536                    .unwrap();
2537
2538                let seg_ack_msg = timeout(Duration::from_secs(2), rx_b.recv())
2539                    .await
2540                    .unwrap()
2541                    .unwrap();
2542                let decoded = apdu::decode_apdu(seg_ack_msg.apdu.clone()).unwrap();
2543                if let Apdu::SegmentAck(sa) = decoded {
2544                    assert_eq!(sa.invoke_id, invoke_id);
2545                    assert_eq!(sa.sequence_number, i as u8);
2546                } else {
2547                    panic!("Expected SegmentAck, got {:?}", decoded);
2548                }
2549            }
2550
2551            net_b.stop().await.unwrap();
2552        });
2553
2554        let result = client
2555            .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
2556            .await;
2557
2558        assert!(result.is_ok());
2559        assert_eq!(
2560            result.unwrap(),
2561            vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
2562        );
2563
2564        b_handle.await.unwrap();
2565        client.stop().await.unwrap();
2566    }
2567
2568    #[tokio::test]
2569    async fn segmented_confirmed_request_sends_segments() {
2570        let mut client = BACnetClient::builder()
2571            .interface(Ipv4Addr::LOCALHOST)
2572            .port(0)
2573            .apdu_timeout_ms(5000)
2574            .max_apdu_length(50)
2575            .build()
2576            .await
2577            .unwrap();
2578
2579        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2580        let mut net_b = NetworkLayer::new(transport_b);
2581        let mut rx_b = net_b.start().await.unwrap();
2582        let b_mac = net_b.local_mac().to_vec();
2583
2584        let service_data: Vec<u8> = (0u8..100).collect();
2585        let expected_data = service_data.clone();
2586
2587        let b_handle = tokio::spawn(async move {
2588            let mut all_service_data = Vec::new();
2589            let mut client_mac;
2590            let mut invoke_id;
2591
2592            loop {
2593                let received = timeout(Duration::from_secs(3), rx_b.recv())
2594                    .await
2595                    .expect("server timed out waiting for segment")
2596                    .expect("server channel closed");
2597
2598                let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2599                if let Apdu::ConfirmedRequest(req) = decoded {
2600                    assert!(req.segmented, "expected segmented request");
2601                    invoke_id = req.invoke_id;
2602                    client_mac = received.source_mac.clone();
2603                    let seq = req.sequence_number.unwrap();
2604                    all_service_data.extend_from_slice(&req.service_request);
2605
2606                    let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
2607                        negative_ack: false,
2608                        sent_by_server: true,
2609                        invoke_id,
2610                        sequence_number: seq,
2611                        actual_window_size: 1,
2612                    });
2613                    let mut buf = BytesMut::new();
2614                    encode_apdu(&mut buf, &seg_ack);
2615                    net_b
2616                        .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2617                        .await
2618                        .unwrap();
2619
2620                    if !req.more_follows {
2621                        break;
2622                    }
2623                } else {
2624                    panic!("Expected ConfirmedRequest, got {:?}", decoded);
2625                }
2626            }
2627
2628            let ack = Apdu::SimpleAck(SimpleAck {
2629                invoke_id,
2630                service_choice: ConfirmedServiceChoice::WRITE_PROPERTY,
2631            });
2632            let mut buf = BytesMut::new();
2633            encode_apdu(&mut buf, &ack);
2634            net_b
2635                .send_apdu(&buf, &client_mac, false, NetworkPriority::NORMAL)
2636                .await
2637                .unwrap();
2638
2639            net_b.stop().await.unwrap();
2640            all_service_data
2641        });
2642
2643        let result = client
2644            .confirmed_request(
2645                &b_mac,
2646                ConfirmedServiceChoice::WRITE_PROPERTY,
2647                &service_data,
2648            )
2649            .await;
2650
2651        assert!(result.is_ok());
2652        assert!(result.unwrap().is_empty());
2653
2654        let received_data = b_handle.await.unwrap();
2655        assert_eq!(received_data, expected_data);
2656
2657        client.stop().await.unwrap();
2658    }
2659
2660    #[tokio::test]
2661    async fn segmented_request_with_complex_ack_response() {
2662        let mut client = BACnetClient::builder()
2663            .interface(Ipv4Addr::LOCALHOST)
2664            .port(0)
2665            .apdu_timeout_ms(5000)
2666            .max_apdu_length(50)
2667            .build()
2668            .await
2669            .unwrap();
2670
2671        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2672        let mut net_b = NetworkLayer::new(transport_b);
2673        let mut rx_b = net_b.start().await.unwrap();
2674        let b_mac = net_b.local_mac().to_vec();
2675
2676        let service_data: Vec<u8> = (0u8..60).collect();
2677
2678        let b_handle = tokio::spawn(async move {
2679            let mut client_mac;
2680            let mut invoke_id;
2681
2682            loop {
2683                let received = timeout(Duration::from_secs(3), rx_b.recv())
2684                    .await
2685                    .unwrap()
2686                    .unwrap();
2687
2688                let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2689                if let Apdu::ConfirmedRequest(req) = decoded {
2690                    invoke_id = req.invoke_id;
2691                    client_mac = received.source_mac.clone();
2692                    let seq = req.sequence_number.unwrap();
2693
2694                    let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
2695                        negative_ack: false,
2696                        sent_by_server: true,
2697                        invoke_id,
2698                        sequence_number: seq,
2699                        actual_window_size: 1,
2700                    });
2701                    let mut buf = BytesMut::new();
2702                    encode_apdu(&mut buf, &seg_ack);
2703                    net_b
2704                        .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2705                        .await
2706                        .unwrap();
2707
2708                    if !req.more_follows {
2709                        break;
2710                    }
2711                }
2712            }
2713
2714            let ack = Apdu::ComplexAck(ComplexAck {
2715                segmented: false,
2716                more_follows: false,
2717                invoke_id,
2718                sequence_number: None,
2719                proposed_window_size: None,
2720                service_choice: ConfirmedServiceChoice::READ_PROPERTY,
2721                service_ack: Bytes::from_static(&[0xCA, 0xFE]),
2722            });
2723            let mut buf = BytesMut::new();
2724            encode_apdu(&mut buf, &ack);
2725            net_b
2726                .send_apdu(&buf, &client_mac, false, NetworkPriority::NORMAL)
2727                .await
2728                .unwrap();
2729
2730            net_b.stop().await.unwrap();
2731        });
2732
2733        let result = client
2734            .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &service_data)
2735            .await;
2736
2737        assert!(result.is_ok());
2738        assert_eq!(result.unwrap(), vec![0xCA, 0xFE]);
2739
2740        b_handle.await.unwrap();
2741        client.stop().await.unwrap();
2742    }
2743
2744    #[tokio::test]
2745    async fn segment_overflow_guard() {
2746        let mut client = BACnetClient::builder()
2747            .interface(Ipv4Addr::LOCALHOST)
2748            .port(0)
2749            .apdu_timeout_ms(2000)
2750            .max_apdu_length(50)
2751            .build()
2752            .await
2753            .unwrap();
2754
2755        let huge_payload = vec![0u8; 257 * 44];
2756        let fake_mac = vec![10, 99, 99, 99, 0xBA, 0xC0];
2757
2758        let result = client
2759            .confirmed_request(
2760                &fake_mac,
2761                ConfirmedServiceChoice::READ_PROPERTY,
2762                &huge_payload,
2763            )
2764            .await;
2765
2766        assert!(
2767            result.is_err(),
2768            "expected error for oversized payload, got success"
2769        );
2770
2771        client.stop().await.unwrap();
2772    }
2773
2774    #[test]
2775    fn seg_receiver_timeout_is_4s() {
2776        assert_eq!(SEG_RECEIVER_TIMEOUT, Duration::from_secs(4));
2777    }
2778}