Skip to main content

bacnet_client/
client.rs

1//! BACnetClient: high-level and low-level request APIs.
2//!
3//! The client owns a NetworkLayer, spawns an APDU dispatch task, and provides
4//! methods for sending confirmed and unconfirmed BACnet requests.
5
6use std::collections::HashMap;
7use std::net::Ipv4Addr;
8use std::sync::Arc;
9use std::time::Instant;
10
11use bytes::{Bytes, BytesMut};
12use tokio::sync::{broadcast, mpsc, Mutex};
13use tokio::task::JoinHandle;
14use tokio::time::{timeout, Duration};
15use tracing::{debug, warn};
16
17use bacnet_encoding::apdu::{
18    self, encode_apdu, Apdu, ConfirmedRequest as ConfirmedRequestPdu, SegmentAck as SegmentAckPdu,
19    SimpleAck,
20};
21use bacnet_network::layer::NetworkLayer;
22use bacnet_services::cov::COVNotificationRequest;
23use bacnet_transport::bip::BipTransport;
24use bacnet_transport::port::TransportPort;
25use bacnet_types::enums::{ConfirmedServiceChoice, NetworkPriority, UnconfirmedServiceChoice};
26use bacnet_types::error::Error;
27use bacnet_types::MacAddr;
28
29use crate::discovery::{DeviceTable, DiscoveredDevice};
30use crate::segmentation::{max_segment_payload, split_payload, SegmentReceiver, SegmentedPduType};
31use crate::tsm::{Tsm, TsmConfig, TsmResponse};
32
33/// Client configuration.
34#[derive(Debug, Clone)]
35pub struct ClientConfig {
36    /// Local interface to bind.
37    pub interface: Ipv4Addr,
38    /// UDP port (0 for ephemeral).
39    pub port: u16,
40    /// Directed broadcast address.
41    pub broadcast_address: Ipv4Addr,
42    /// APDU timeout in milliseconds.
43    pub apdu_timeout_ms: u64,
44    /// Number of APDU retries.
45    pub apdu_retries: u8,
46    /// Maximum APDU length this client accepts.
47    pub max_apdu_length: u16,
48    /// Maximum segments this client accepts (None = unspecified).
49    pub max_segments: Option<u8>,
50    /// Whether this client accepts segmented responses.
51    pub segmented_response_accepted: bool,
52    /// Proposed window size for segmented transfers (1-127, default 1).
53    pub proposed_window_size: u8,
54}
55
56impl Default for ClientConfig {
57    fn default() -> Self {
58        Self {
59            interface: Ipv4Addr::UNSPECIFIED,
60            port: 0xBAC0,
61            broadcast_address: Ipv4Addr::BROADCAST,
62            apdu_timeout_ms: 6000,
63            apdu_retries: 3,
64            max_apdu_length: 1476,
65            max_segments: None,
66            segmented_response_accepted: true,
67            proposed_window_size: 1,
68        }
69    }
70}
71
72/// Generic builder for BACnetClient with a pre-built transport.
73pub struct ClientBuilder<T: TransportPort> {
74    config: ClientConfig,
75    transport: Option<T>,
76}
77
78impl<T: TransportPort + 'static> ClientBuilder<T> {
79    /// Set the pre-built transport.
80    pub fn transport(mut self, transport: T) -> Self {
81        self.transport = Some(transport);
82        self
83    }
84
85    /// Set APDU timeout in milliseconds.
86    pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
87        self.config.apdu_timeout_ms = ms;
88        self
89    }
90
91    /// Set the maximum APDU length this client accepts.
92    pub fn max_apdu_length(mut self, len: u16) -> Self {
93        self.config.max_apdu_length = len;
94        self
95    }
96
97    /// Build and start the client.
98    pub async fn build(self) -> Result<BACnetClient<T>, Error> {
99        let transport = self
100            .transport
101            .ok_or_else(|| Error::Encoding("transport not set on ClientBuilder".into()))?;
102        BACnetClient::start(self.config, transport).await
103    }
104}
105
106/// BIP-specific builder that constructs `BipTransport` from interface/port/broadcast fields.
107pub struct BipClientBuilder {
108    config: ClientConfig,
109}
110
111impl BipClientBuilder {
112    /// Set the local interface IP.
113    pub fn interface(mut self, ip: Ipv4Addr) -> Self {
114        self.config.interface = ip;
115        self
116    }
117
118    /// Set the UDP port (0 for ephemeral).
119    pub fn port(mut self, port: u16) -> Self {
120        self.config.port = port;
121        self
122    }
123
124    /// Set the directed broadcast address.
125    pub fn broadcast_address(mut self, addr: Ipv4Addr) -> Self {
126        self.config.broadcast_address = addr;
127        self
128    }
129
130    /// Set APDU timeout in milliseconds.
131    pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
132        self.config.apdu_timeout_ms = ms;
133        self
134    }
135
136    /// Set the maximum APDU length this client accepts.
137    pub fn max_apdu_length(mut self, len: u16) -> Self {
138        self.config.max_apdu_length = len;
139        self
140    }
141
142    /// Build and start the client, constructing a BipTransport from the config.
143    pub async fn build(self) -> Result<BACnetClient<BipTransport>, Error> {
144        let transport = BipTransport::new(
145            self.config.interface,
146            self.config.port,
147            self.config.broadcast_address,
148        );
149        BACnetClient::start(self.config, transport).await
150    }
151}
152
153/// In-progress segmented receive state.
154struct SegmentedReceiveState {
155    receiver: SegmentReceiver,
156    /// Next expected sequence number (for gap detection).
157    expected_next_seq: u8,
158    /// Timestamp of last received segment (for reaping stale sessions).
159    last_activity: Instant,
160}
161
162/// Timeout for idle segmented reassembly sessions (Clause 9.1.6).
163const SEG_RECEIVER_TIMEOUT: Duration = Duration::from_secs(4);
164
165/// Key for tracking in-progress segmented receives: (source_mac, invoke_id).
166type SegKey = (MacAddr, u8);
167
168/// BACnet client with low-level and high-level request APIs.
169pub struct BACnetClient<T: TransportPort> {
170    config: ClientConfig,
171    network: Arc<NetworkLayer<T>>,
172    tsm: Arc<Mutex<Tsm>>,
173    device_table: Arc<Mutex<DeviceTable>>,
174    cov_tx: broadcast::Sender<COVNotificationRequest>,
175    dispatch_task: Option<JoinHandle<()>>,
176    seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
177    local_mac: MacAddr,
178}
179
180impl BACnetClient<BipTransport> {
181    /// Create a BIP-specific builder with interface/port/broadcast fields.
182    pub fn bip_builder() -> BipClientBuilder {
183        BipClientBuilder {
184            config: ClientConfig::default(),
185        }
186    }
187
188    /// Create a BIP-specific builder (alias for backward compatibility).
189    pub fn builder() -> BipClientBuilder {
190        Self::bip_builder()
191    }
192}
193
194#[cfg(feature = "sc-tls")]
195impl BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>> {
196    /// Create an SC-specific builder that connects to a BACnet/SC hub.
197    pub fn sc_builder() -> ScClientBuilder {
198        ScClientBuilder {
199            config: ClientConfig::default(),
200            hub_url: String::new(),
201            tls_config: None,
202            vmac: [0; 6],
203            heartbeat_interval_ms: 30_000,
204            heartbeat_timeout_ms: 60_000,
205            reconnect: None,
206        }
207    }
208}
209
210/// SC-specific client builder.
211///
212/// Created by [`BACnetClient::sc_builder()`].  Requires the `sc-tls` feature.
213#[cfg(feature = "sc-tls")]
214pub struct ScClientBuilder {
215    config: ClientConfig,
216    hub_url: String,
217    tls_config: Option<std::sync::Arc<tokio_rustls::rustls::ClientConfig>>,
218    vmac: bacnet_transport::sc_frame::Vmac,
219    heartbeat_interval_ms: u64,
220    heartbeat_timeout_ms: u64,
221    reconnect: Option<bacnet_transport::sc::ScReconnectConfig>,
222}
223
224#[cfg(feature = "sc-tls")]
225impl ScClientBuilder {
226    /// Set the hub WebSocket URL (e.g. `wss://hub.example.com/bacnet`).
227    pub fn hub_url(mut self, url: &str) -> Self {
228        self.hub_url = url.to_string();
229        self
230    }
231
232    /// Set the TLS client configuration.
233    pub fn tls_config(
234        mut self,
235        config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
236    ) -> Self {
237        self.tls_config = Some(config);
238        self
239    }
240
241    /// Set the local VMAC address.
242    pub fn vmac(mut self, vmac: [u8; 6]) -> Self {
243        self.vmac = vmac;
244        self
245    }
246
247    /// Set the APDU timeout in milliseconds.
248    pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
249        self.config.apdu_timeout_ms = ms;
250        self
251    }
252
253    /// Set the heartbeat interval in milliseconds (default 30 000).
254    pub fn heartbeat_interval_ms(mut self, ms: u64) -> Self {
255        self.heartbeat_interval_ms = ms;
256        self
257    }
258
259    /// Set the heartbeat timeout in milliseconds (default 60 000).
260    pub fn heartbeat_timeout_ms(mut self, ms: u64) -> Self {
261        self.heartbeat_timeout_ms = ms;
262        self
263    }
264
265    /// Enable automatic reconnection with the given configuration.
266    pub fn reconnect(mut self, config: bacnet_transport::sc::ScReconnectConfig) -> Self {
267        self.reconnect = Some(config);
268        self
269    }
270
271    /// Connect to the hub and start the client.
272    pub async fn build(
273        self,
274    ) -> Result<
275        BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>>,
276        Error,
277    > {
278        let tls_config = self
279            .tls_config
280            .ok_or_else(|| Error::Encoding("SC client builder: tls_config is required".into()))?;
281
282        let ws = bacnet_transport::sc_tls::TlsWebSocket::connect(&self.hub_url, tls_config).await?;
283
284        let mut transport = bacnet_transport::sc::ScTransport::new(ws, self.vmac)
285            .with_heartbeat_interval_ms(self.heartbeat_interval_ms)
286            .with_heartbeat_timeout_ms(self.heartbeat_timeout_ms);
287        if let Some(rc) = self.reconnect {
288            #[allow(deprecated)]
289            {
290                transport = transport.with_reconnect(rc);
291            }
292        }
293
294        BACnetClient::start(self.config, transport).await
295    }
296}
297
298impl<T: TransportPort + 'static> BACnetClient<T> {
299    /// Create a generic builder that accepts a pre-built transport.
300    pub fn generic_builder() -> ClientBuilder<T> {
301        ClientBuilder {
302            config: ClientConfig::default(),
303            transport: None,
304        }
305    }
306
307    /// Start the client: bind transport, start network layer, spawn dispatch.
308    pub async fn start(mut config: ClientConfig, transport: T) -> Result<Self, Error> {
309        // Clamp max_apdu_length to the transport's physical limit.
310        let transport_max = transport.max_apdu_length();
311        config.max_apdu_length = config.max_apdu_length.min(transport_max);
312
313        let mut network = NetworkLayer::new(transport);
314        let mut apdu_rx = network.start().await?;
315        let local_mac = MacAddr::from_slice(network.local_mac());
316
317        let network = Arc::new(network);
318
319        let tsm_config = TsmConfig {
320            apdu_timeout_ms: config.apdu_timeout_ms,
321            apdu_retries: config.apdu_retries,
322        };
323        let tsm = Arc::new(Mutex::new(Tsm::new(tsm_config)));
324        let tsm_dispatch = Arc::clone(&tsm);
325        let device_table = Arc::new(Mutex::new(DeviceTable::new()));
326        let device_table_dispatch = Arc::clone(&device_table);
327        let network_dispatch = Arc::clone(&network);
328        let (cov_tx, _) = broadcast::channel::<COVNotificationRequest>(64);
329        let cov_tx_dispatch = cov_tx.clone();
330        let seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>> =
331            Arc::new(Mutex::new(HashMap::new()));
332        let seg_ack_senders_dispatch = Arc::clone(&seg_ack_senders);
333
334        // Spawn APDU dispatch task
335        let dispatch_task = tokio::spawn(async move {
336            // Segmented receive state is task-local — no external sharing needed.
337            let mut seg_state: HashMap<SegKey, SegmentedReceiveState> = HashMap::new();
338
339            while let Some(received) = apdu_rx.recv().await {
340                // Reap timed-out segmented reassembly sessions (Clause 9.1.6).
341                let now = Instant::now();
342                seg_state.retain(|_key, state| {
343                    now.duration_since(state.last_activity) < SEG_RECEIVER_TIMEOUT
344                });
345
346                match apdu::decode_apdu(received.apdu.clone()) {
347                    Ok(decoded) => {
348                        Self::dispatch_apdu(
349                            &tsm_dispatch,
350                            &device_table_dispatch,
351                            &network_dispatch,
352                            &cov_tx_dispatch,
353                            &mut seg_state,
354                            &seg_ack_senders_dispatch,
355                            &received.source_mac,
356                            decoded,
357                        )
358                        .await;
359                    }
360                    Err(e) => {
361                        warn!(error = %e, "Failed to decode received APDU");
362                    }
363                }
364            }
365        });
366
367        Ok(Self {
368            config,
369            network,
370            tsm,
371            device_table,
372            cov_tx,
373            dispatch_task: Some(dispatch_task),
374            seg_ack_senders,
375            local_mac,
376        })
377    }
378
379    /// Dispatch a received APDU to the appropriate TSM handler.
380    #[allow(clippy::too_many_arguments)]
381    async fn dispatch_apdu(
382        tsm: &Arc<Mutex<Tsm>>,
383        device_table: &Arc<Mutex<DeviceTable>>,
384        network: &Arc<NetworkLayer<T>>,
385        cov_tx: &broadcast::Sender<COVNotificationRequest>,
386        seg_state: &mut HashMap<SegKey, SegmentedReceiveState>,
387        seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
388        source_mac: &[u8],
389        apdu: Apdu,
390    ) {
391        match apdu {
392            Apdu::SimpleAck(ack) => {
393                debug!(invoke_id = ack.invoke_id, "Received SimpleAck");
394                let mut tsm = tsm.lock().await;
395                tsm.complete_transaction(source_mac, ack.invoke_id, TsmResponse::SimpleAck);
396            }
397            Apdu::ComplexAck(ack) => {
398                if ack.segmented {
399                    Self::handle_segmented_complex_ack(tsm, network, seg_state, source_mac, ack)
400                        .await;
401                } else {
402                    debug!(invoke_id = ack.invoke_id, "Received ComplexAck");
403                    let mut tsm = tsm.lock().await;
404                    tsm.complete_transaction(
405                        source_mac,
406                        ack.invoke_id,
407                        TsmResponse::ComplexAck {
408                            service_data: ack.service_ack,
409                        },
410                    );
411                }
412            }
413            Apdu::Error(err) => {
414                debug!(invoke_id = err.invoke_id, "Received Error PDU");
415                let mut tsm = tsm.lock().await;
416                tsm.complete_transaction(
417                    source_mac,
418                    err.invoke_id,
419                    TsmResponse::Error {
420                        class: err.error_class.to_raw() as u32,
421                        code: err.error_code.to_raw() as u32,
422                    },
423                );
424            }
425            Apdu::Reject(rej) => {
426                debug!(invoke_id = rej.invoke_id, "Received Reject PDU");
427                let mut tsm = tsm.lock().await;
428                tsm.complete_transaction(
429                    source_mac,
430                    rej.invoke_id,
431                    TsmResponse::Reject {
432                        reason: rej.reject_reason.to_raw(),
433                    },
434                );
435            }
436            Apdu::Abort(abt) => {
437                debug!(invoke_id = abt.invoke_id, "Received Abort PDU");
438                let mut tsm = tsm.lock().await;
439                tsm.complete_transaction(
440                    source_mac,
441                    abt.invoke_id,
442                    TsmResponse::Abort {
443                        reason: abt.abort_reason.to_raw(),
444                    },
445                );
446            }
447            Apdu::ConfirmedRequest(req) => {
448                // Handle ConfirmedCOVNotification from a server
449                if req.service_choice == ConfirmedServiceChoice::CONFIRMED_COV_NOTIFICATION {
450                    match COVNotificationRequest::decode(&req.service_request) {
451                        Ok(notification) => {
452                            debug!(
453                                object = ?notification.monitored_object_identifier,
454                                "Received ConfirmedCOVNotification"
455                            );
456                            let _ = cov_tx.send(notification);
457
458                            // Send SimpleAck back
459                            let ack = Apdu::SimpleAck(SimpleAck {
460                                invoke_id: req.invoke_id,
461                                service_choice: req.service_choice,
462                            });
463                            let mut buf = BytesMut::with_capacity(4);
464                            encode_apdu(&mut buf, &ack);
465                            if let Err(e) = network
466                                .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
467                                .await
468                            {
469                                warn!(error = %e, "Failed to send SimpleAck for COV notification");
470                            }
471                        }
472                        Err(e) => {
473                            warn!(error = %e, "Failed to decode ConfirmedCOVNotification");
474                        }
475                    }
476                } else {
477                    debug!(
478                        service = req.service_choice.to_raw(),
479                        "Ignoring ConfirmedRequest (client mode)"
480                    );
481                }
482            }
483            Apdu::UnconfirmedRequest(req) => {
484                if req.service_choice == UnconfirmedServiceChoice::I_AM {
485                    match bacnet_services::who_is::IAmRequest::decode(&req.service_request) {
486                        Ok(i_am) => {
487                            debug!(
488                                device = i_am.object_identifier.instance_number(),
489                                vendor = i_am.vendor_id,
490                                "Received IAm"
491                            );
492                            let device = DiscoveredDevice {
493                                object_identifier: i_am.object_identifier,
494                                mac_address: MacAddr::from_slice(source_mac),
495                                max_apdu_length: i_am.max_apdu_length,
496                                segmentation_supported: i_am.segmentation_supported,
497                                max_segments_accepted: None,
498                                vendor_id: i_am.vendor_id,
499                                last_seen: std::time::Instant::now(),
500                            };
501                            device_table.lock().await.upsert(device);
502                        }
503                        Err(e) => {
504                            warn!(error = %e, "Failed to decode IAm");
505                        }
506                    }
507                } else if req.service_choice
508                    == UnconfirmedServiceChoice::UNCONFIRMED_COV_NOTIFICATION
509                {
510                    match COVNotificationRequest::decode(&req.service_request) {
511                        Ok(notification) => {
512                            debug!(
513                                object = ?notification.monitored_object_identifier,
514                                "Received UnconfirmedCOVNotification"
515                            );
516                            let _ = cov_tx.send(notification);
517                        }
518                        Err(e) => {
519                            warn!(error = %e, "Failed to decode UnconfirmedCOVNotification");
520                        }
521                    }
522                } else {
523                    debug!(
524                        service = req.service_choice.to_raw(),
525                        "Ignoring unconfirmed service in client dispatch"
526                    );
527                }
528            }
529            Apdu::SegmentAck(sa) => {
530                // Forward to the segmented send in progress for this transaction.
531                let key = (MacAddr::from_slice(source_mac), sa.invoke_id);
532                let senders = seg_ack_senders.lock().await;
533                if let Some(tx) = senders.get(&key) {
534                    let _ = tx.try_send(sa);
535                } else {
536                    debug!(
537                        invoke_id = sa.invoke_id,
538                        "Ignoring SegmentAck for unknown transaction"
539                    );
540                }
541            }
542        }
543    }
544
545    /// Handle a segmented ComplexAck: accumulate segments, send SegmentAck,
546    /// reassemble and complete TSM transaction when all segments received.
547    async fn handle_segmented_complex_ack(
548        tsm: &Arc<Mutex<Tsm>>,
549        network: &Arc<NetworkLayer<T>>,
550        seg_state: &mut HashMap<SegKey, SegmentedReceiveState>,
551        source_mac: &[u8],
552        ack: bacnet_encoding::apdu::ComplexAck,
553    ) {
554        let seq = ack.sequence_number.unwrap_or(0);
555        let key = (MacAddr::from_slice(source_mac), ack.invoke_id);
556
557        debug!(
558            invoke_id = ack.invoke_id,
559            seq = seq,
560            more = ack.more_follows,
561            "Received segmented ComplexAck"
562        );
563
564        // Cap concurrent segmented sessions to prevent resource exhaustion
565        const MAX_CONCURRENT_SEG_SESSIONS: usize = 64;
566        if !seg_state.contains_key(&key) && seg_state.len() >= MAX_CONCURRENT_SEG_SESSIONS {
567            warn!(
568                invoke_id = ack.invoke_id,
569                sessions = seg_state.len(),
570                "Max concurrent segmented sessions reached, dropping segment"
571            );
572            return;
573        }
574
575        // Get or create the receive state for this transaction
576        let state = seg_state
577            .entry(key.clone())
578            .or_insert_with(|| SegmentedReceiveState {
579                receiver: SegmentReceiver::new(),
580                expected_next_seq: 0,
581                last_activity: Instant::now(),
582            });
583
584        // Update activity timestamp for this session.
585        state.last_activity = Instant::now();
586
587        // Gap detection: if the sequence number doesn't match expected, send
588        // a negative SegmentAck requesting retransmission from the last
589        // contiguous sequence number.
590        if seq != state.expected_next_seq {
591            warn!(
592                invoke_id = ack.invoke_id,
593                expected = state.expected_next_seq,
594                received = seq,
595                "Segment gap detected, sending negative SegmentAck"
596            );
597            let neg_ack = Apdu::SegmentAck(SegmentAckPdu {
598                negative_ack: true,
599                sent_by_server: false,
600                invoke_id: ack.invoke_id,
601                sequence_number: state.expected_next_seq,
602                actual_window_size: ack.proposed_window_size.unwrap_or(1),
603            });
604            let mut buf = BytesMut::with_capacity(4);
605            encode_apdu(&mut buf, &neg_ack);
606            if let Err(e) = network
607                .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
608                .await
609            {
610                warn!(error = %e, "Failed to send negative SegmentAck");
611            }
612            return;
613        }
614
615        // Store this segment and advance expected sequence
616        if let Err(e) = state.receiver.receive(seq, ack.service_ack) {
617            warn!(error = %e, "Rejecting oversized segment");
618            return;
619        }
620        state.expected_next_seq = seq.wrapping_add(1);
621
622        // Send SegmentAck to acknowledge receipt
623        let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
624            negative_ack: false,
625            sent_by_server: false,
626            invoke_id: ack.invoke_id,
627            sequence_number: seq,
628            actual_window_size: ack.proposed_window_size.unwrap_or(1),
629        });
630        let mut buf = BytesMut::with_capacity(4);
631        encode_apdu(&mut buf, &seg_ack);
632        if let Err(e) = network
633            .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
634            .await
635        {
636            warn!(error = %e, "Failed to send SegmentAck");
637        }
638
639        // If this is the last segment, reassemble and complete the transaction.
640        // Gap detection: reassemble() validates that segments 0..total are all
641        // present; any missing sequence number produces an Err.
642        if !ack.more_follows {
643            let state = seg_state.remove(&key).unwrap();
644            let total = state.receiver.received_count();
645            match state.receiver.reassemble(total) {
646                Ok(service_data) => {
647                    debug!(
648                        invoke_id = ack.invoke_id,
649                        segments = total,
650                        bytes = service_data.len(),
651                        "Reassembled segmented ComplexAck"
652                    );
653                    let mut tsm = tsm.lock().await;
654                    tsm.complete_transaction(
655                        source_mac,
656                        ack.invoke_id,
657                        TsmResponse::ComplexAck {
658                            service_data: Bytes::from(service_data),
659                        },
660                    );
661                }
662                Err(e) => {
663                    warn!(error = %e, "Failed to reassemble segmented ComplexAck");
664                }
665            }
666        }
667    }
668
669    /// Get the client's local MAC address.
670    pub fn local_mac(&self) -> &[u8] {
671        &self.local_mac
672    }
673
674    // -----------------------------------------------------------------------
675    // Low-level API
676    // -----------------------------------------------------------------------
677
678    /// Send a confirmed request and wait for the response.
679    ///
680    /// Returns the service response data (empty `Vec` for SimpleAck).
681    /// Returns an error on timeout, protocol error, reject, or abort.
682    ///
683    /// Automatically uses segmented transfer when the payload exceeds the
684    /// remote device's max APDU length.
685    pub async fn confirmed_request(
686        &self,
687        destination_mac: &[u8],
688        service_choice: ConfirmedServiceChoice,
689        service_data: &[u8],
690    ) -> Result<Bytes, Error> {
691        // Check if segmentation is needed.
692        // Non-segmented ConfirmedRequest overhead: 4 bytes (type+flags, max-seg/apdu, invoke, service).
693        let unsegmented_apdu_size = 4 + service_data.len();
694        let (remote_max_apdu, remote_max_segments) = {
695            let dt = self.device_table.lock().await;
696            let device = dt.get_by_mac(destination_mac);
697            let max_apdu = device
698                .map(|d| d.max_apdu_length as u16)
699                .unwrap_or(self.config.max_apdu_length);
700            let max_seg = device.and_then(|d| d.max_segments_accepted);
701            (max_apdu, max_seg)
702        };
703        if unsegmented_apdu_size > remote_max_apdu as usize {
704            return self
705                .segmented_confirmed_request(
706                    destination_mac,
707                    service_choice,
708                    service_data,
709                    remote_max_apdu,
710                    remote_max_segments,
711                )
712                .await;
713        }
714
715        // Allocate invoke ID and register transaction
716        let (invoke_id, rx) = {
717            let mut tsm = self.tsm.lock().await;
718            let invoke_id = tsm.allocate_invoke_id(destination_mac).ok_or_else(|| {
719                Error::Encoding("all invoke IDs exhausted for destination".into())
720            })?;
721            let rx = tsm.register_transaction(MacAddr::from_slice(destination_mac), invoke_id);
722            (invoke_id, rx)
723        };
724
725        // Build ConfirmedRequest APDU
726        let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
727            segmented: false,
728            more_follows: false,
729            segmented_response_accepted: self.config.segmented_response_accepted,
730            max_segments: self.config.max_segments,
731            max_apdu_length: self.config.max_apdu_length,
732            invoke_id,
733            sequence_number: None,
734            proposed_window_size: None,
735            service_choice,
736            service_request: Bytes::copy_from_slice(service_data),
737        });
738
739        let mut buf = BytesMut::with_capacity(6 + service_data.len());
740        encode_apdu(&mut buf, &pdu);
741
742        // Retry loop per Clause 5.4.2: retransmit on timeout up to apdu_retries times.
743        // The invoke_id stays the same across retries. The TSM transaction is NOT
744        // cancelled between retries — only on final timeout or non-timeout error.
745        let timeout_duration = Duration::from_millis(self.config.apdu_timeout_ms);
746        let max_retries = self.config.apdu_retries;
747        let mut attempts: u8 = 0;
748        let mut rx = rx;
749
750        loop {
751            if let Err(e) = self
752                .network
753                .send_apdu(&buf, destination_mac, true, NetworkPriority::NORMAL)
754                .await
755            {
756                // Clean up the invoke ID on send failure to prevent pool exhaustion
757                let mut tsm = self.tsm.lock().await;
758                tsm.cancel_transaction(destination_mac, invoke_id);
759                return Err(e);
760            }
761
762            match timeout(timeout_duration, &mut rx).await {
763                Ok(Ok(response)) => {
764                    // Response received — convert TsmResponse to Result
765                    return match response {
766                        TsmResponse::SimpleAck => Ok(Bytes::new()),
767                        TsmResponse::ComplexAck { service_data } => Ok(service_data),
768                        TsmResponse::Error { class, code } => Err(Error::Protocol { class, code }),
769                        TsmResponse::Reject { reason } => Err(Error::Reject { reason }),
770                        TsmResponse::Abort { reason } => Err(Error::Abort { reason }),
771                    };
772                }
773                Ok(Err(_)) => {
774                    // Channel closed — TSM transaction was cancelled externally
775                    return Err(Error::Encoding("TSM response channel closed".into()));
776                }
777                Err(_timeout) => {
778                    attempts += 1;
779                    if attempts > max_retries {
780                        // Final timeout — cancel TSM transaction and return error
781                        let mut tsm = self.tsm.lock().await;
782                        tsm.cancel_transaction(destination_mac, invoke_id);
783                        return Err(Error::Timeout(timeout_duration));
784                    }
785                    debug!(
786                        invoke_id,
787                        attempt = attempts,
788                        max_retries,
789                        "APDU timeout, retrying confirmed request"
790                    );
791                }
792            }
793        }
794    }
795
796    /// Send a confirmed request using segmented transfer.
797    ///
798    /// Splits the service data into segments, sends them with windowed flow
799    /// control (SegmentAck from server), then waits for the final response.
800    async fn segmented_confirmed_request(
801        &self,
802        destination_mac: &[u8],
803        service_choice: ConfirmedServiceChoice,
804        service_data: &[u8],
805        remote_max_apdu: u16,
806        remote_max_segments: Option<u32>,
807    ) -> Result<Bytes, Error> {
808        let max_seg_size = max_segment_payload(remote_max_apdu, SegmentedPduType::ConfirmedRequest);
809        let segments = split_payload(service_data, max_seg_size);
810        let total_segments = segments.len();
811
812        if total_segments > 255 {
813            return Err(Error::Segmentation(format!(
814                "payload requires {} segments, maximum is 255",
815                total_segments
816            )));
817        }
818
819        if let Some(max_seg) = remote_max_segments {
820            if total_segments > max_seg as usize {
821                return Err(Error::Segmentation(format!(
822                    "request requires {} segments but remote accepts at most {}",
823                    total_segments, max_seg
824                )));
825            }
826        }
827
828        debug!(
829            total_segments,
830            max_seg_size,
831            service_data_len = service_data.len(),
832            "Starting segmented confirmed request"
833        );
834
835        // Allocate invoke ID and register TSM transaction (for final response).
836        let (invoke_id, rx) = {
837            let mut tsm = self.tsm.lock().await;
838            let invoke_id = tsm.allocate_invoke_id(destination_mac).ok_or_else(|| {
839                Error::Encoding("all invoke IDs exhausted for destination".into())
840            })?;
841            let rx = tsm.register_transaction(MacAddr::from_slice(destination_mac), invoke_id);
842            (invoke_id, rx)
843        };
844
845        // Register a channel for receiving SegmentAck PDUs during the send.
846        let (seg_ack_tx, mut seg_ack_rx) = mpsc::channel(16);
847        {
848            let key = (MacAddr::from_slice(destination_mac), invoke_id);
849            self.seg_ack_senders.lock().await.insert(key, seg_ack_tx);
850        }
851
852        let timeout_duration = Duration::from_millis(self.config.apdu_timeout_ms);
853        let max_ack_retries = self.config.apdu_retries;
854        let mut window_size = self.config.proposed_window_size.max(1) as usize;
855        let mut next_seq: usize = 0;
856        let mut neg_ack_retries: u32 = 0;
857        const MAX_NEG_ACK_RETRIES: u32 = 10;
858
859        // Send segments in windows, waiting for SegmentAck after each window.
860        let result = async {
861            while next_seq < total_segments {
862                let window_end = (next_seq + window_size).min(total_segments);
863
864                for (seq, segment_data) in segments[next_seq..window_end]
865                    .iter()
866                    .enumerate()
867                    .map(|(i, s)| (next_seq + i, s))
868                {
869                    let is_last = seq == total_segments - 1;
870                    let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
871                        segmented: true,
872                        more_follows: !is_last,
873                        segmented_response_accepted: self.config.segmented_response_accepted,
874                        max_segments: self.config.max_segments,
875                        max_apdu_length: self.config.max_apdu_length,
876                        invoke_id,
877                        sequence_number: Some(seq as u8),
878                        proposed_window_size: Some(self.config.proposed_window_size.max(1)),
879                        service_choice,
880                        service_request: segment_data.clone(),
881                    });
882
883                    let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
884                    encode_apdu(&mut buf, &pdu);
885
886                    self.network
887                        .send_apdu(&buf, destination_mac, true, NetworkPriority::NORMAL)
888                        .await?;
889
890                    debug!(seq, is_last, "Sent segment");
891                }
892
893                // Wait for SegmentAck with retry logic matching the unsegmented path.
894                let ack = {
895                    let mut ack_retries: u8 = 0;
896                    loop {
897                        match timeout(timeout_duration, seg_ack_rx.recv()).await {
898                            Ok(Some(ack)) => break ack,
899                            Ok(None) => {
900                                return Err(Error::Encoding("SegmentAck channel closed".into()));
901                            }
902                            Err(_timeout) => {
903                                ack_retries += 1;
904                                if ack_retries > max_ack_retries {
905                                    return Err(Error::Timeout(timeout_duration));
906                                }
907                                warn!(
908                                    attempt = ack_retries,
909                                    "Retransmitting segmented request window"
910                                );
911                                // Retransmit the current window.
912                                for (seq, segment_data) in segments[next_seq..window_end]
913                                    .iter()
914                                    .enumerate()
915                                    .map(|(i, s)| (next_seq + i, s))
916                                {
917                                    let is_last = seq == total_segments - 1;
918                                    let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
919                                        segmented: true,
920                                        more_follows: !is_last,
921                                        segmented_response_accepted: self
922                                            .config
923                                            .segmented_response_accepted,
924                                        max_segments: self.config.max_segments,
925                                        max_apdu_length: self.config.max_apdu_length,
926                                        invoke_id,
927                                        sequence_number: Some(seq as u8),
928                                        proposed_window_size: Some(
929                                            self.config.proposed_window_size.max(1),
930                                        ),
931                                        service_choice,
932                                        service_request: segment_data.clone(),
933                                    });
934
935                                    let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
936                                    encode_apdu(&mut buf, &pdu);
937
938                                    self.network
939                                        .send_apdu(
940                                            &buf,
941                                            destination_mac,
942                                            true,
943                                            NetworkPriority::NORMAL,
944                                        )
945                                        .await?;
946                                }
947                            }
948                        }
949                    }
950                };
951
952                debug!(
953                    seq = ack.sequence_number,
954                    negative = ack.negative_ack,
955                    window = ack.actual_window_size,
956                    "Received SegmentAck"
957                );
958
959                // Update window size from server's response.
960                window_size = ack.actual_window_size.max(1) as usize;
961
962                if ack.negative_ack {
963                    neg_ack_retries += 1;
964                    if neg_ack_retries > MAX_NEG_ACK_RETRIES {
965                        return Err(Error::Segmentation(
966                            "too many negative SegmentAck retransmissions".into(),
967                        ));
968                    }
969                    // Server is requesting retransmission from this sequence.
970                    next_seq = ack.sequence_number as usize;
971                } else {
972                    neg_ack_retries = 0;
973                    // Advance past the acknowledged segment.
974                    next_seq = ack.sequence_number as usize + 1;
975                }
976            }
977
978            // All segments sent and acknowledged. Wait for final response via TSM.
979            timeout(timeout_duration, rx)
980                .await
981                .map_err(|_| Error::Timeout(timeout_duration))?
982                .map_err(|_| Error::Encoding("TSM response channel closed".into()))
983        }
984        .await;
985
986        // Clean up seg_ack channel regardless of outcome.
987        {
988            let key = (MacAddr::from_slice(destination_mac), invoke_id);
989            self.seg_ack_senders.lock().await.remove(&key);
990        }
991
992        // On error, cancel the TSM transaction.
993        let response = match result {
994            Ok(response) => response,
995            Err(e) => {
996                let mut tsm = self.tsm.lock().await;
997                tsm.cancel_transaction(destination_mac, invoke_id);
998                return Err(e);
999            }
1000        };
1001
1002        match response {
1003            TsmResponse::SimpleAck => Ok(Bytes::new()),
1004            TsmResponse::ComplexAck { service_data } => Ok(service_data),
1005            TsmResponse::Error { class, code } => Err(Error::Protocol { class, code }),
1006            TsmResponse::Reject { reason } => Err(Error::Reject { reason }),
1007            TsmResponse::Abort { reason } => Err(Error::Abort { reason }),
1008        }
1009    }
1010
1011    /// Send an unconfirmed request (fire-and-forget) to a specific destination.
1012    pub async fn unconfirmed_request(
1013        &self,
1014        destination_mac: &[u8],
1015        service_choice: UnconfirmedServiceChoice,
1016        service_data: &[u8],
1017    ) -> Result<(), Error> {
1018        let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1019            service_choice,
1020            service_request: Bytes::copy_from_slice(service_data),
1021        });
1022
1023        let mut buf = BytesMut::with_capacity(2 + service_data.len());
1024        encode_apdu(&mut buf, &pdu);
1025
1026        self.network
1027            .send_apdu(&buf, destination_mac, false, NetworkPriority::NORMAL)
1028            .await
1029    }
1030
1031    /// Broadcast an unconfirmed request on the local network.
1032    pub async fn broadcast_unconfirmed(
1033        &self,
1034        service_choice: UnconfirmedServiceChoice,
1035        service_data: &[u8],
1036    ) -> Result<(), Error> {
1037        let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1038            service_choice,
1039            service_request: Bytes::copy_from_slice(service_data),
1040        });
1041
1042        let mut buf = BytesMut::with_capacity(2 + service_data.len());
1043        encode_apdu(&mut buf, &pdu);
1044
1045        self.network
1046            .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1047            .await
1048    }
1049
1050    /// Broadcast an unconfirmed request globally (DNET=0xFFFF).
1051    ///
1052    /// Unlike `broadcast_unconfirmed()` which only reaches the local subnet,
1053    /// this sends a global broadcast that routers will forward to all networks.
1054    pub async fn broadcast_global_unconfirmed(
1055        &self,
1056        service_choice: UnconfirmedServiceChoice,
1057        service_data: &[u8],
1058    ) -> Result<(), Error> {
1059        let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1060            service_choice,
1061            service_request: Bytes::copy_from_slice(service_data),
1062        });
1063
1064        let mut buf = BytesMut::with_capacity(2 + service_data.len());
1065        encode_apdu(&mut buf, &pdu);
1066
1067        self.network
1068            .broadcast_global_apdu(&buf, false, NetworkPriority::NORMAL)
1069            .await
1070    }
1071
1072    // -----------------------------------------------------------------------
1073    // High-level API
1074    // -----------------------------------------------------------------------
1075
1076    /// Read a property from a remote device.
1077    pub async fn read_property(
1078        &self,
1079        destination_mac: &[u8],
1080        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1081        property_identifier: bacnet_types::enums::PropertyIdentifier,
1082        property_array_index: Option<u32>,
1083    ) -> Result<bacnet_services::read_property::ReadPropertyACK, Error> {
1084        use bacnet_services::read_property::ReadPropertyRequest;
1085
1086        let request = ReadPropertyRequest {
1087            object_identifier,
1088            property_identifier,
1089            property_array_index,
1090        };
1091        let mut buf = BytesMut::new();
1092        request.encode(&mut buf);
1093
1094        let response_data = self
1095            .confirmed_request(destination_mac, ConfirmedServiceChoice::READ_PROPERTY, &buf)
1096            .await?;
1097
1098        bacnet_services::read_property::ReadPropertyACK::decode(&response_data)
1099    }
1100
1101    /// Write a property on a remote device.
1102    pub async fn write_property(
1103        &self,
1104        destination_mac: &[u8],
1105        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1106        property_identifier: bacnet_types::enums::PropertyIdentifier,
1107        property_array_index: Option<u32>,
1108        property_value: Vec<u8>,
1109        priority: Option<u8>,
1110    ) -> Result<(), Error> {
1111        use bacnet_services::write_property::WritePropertyRequest;
1112
1113        let request = WritePropertyRequest {
1114            object_identifier,
1115            property_identifier,
1116            property_array_index,
1117            property_value,
1118            priority,
1119        };
1120        let mut buf = BytesMut::new();
1121        request.encode(&mut buf);
1122
1123        let _ = self
1124            .confirmed_request(
1125                destination_mac,
1126                ConfirmedServiceChoice::WRITE_PROPERTY,
1127                &buf,
1128            )
1129            .await?;
1130
1131        Ok(())
1132    }
1133
1134    /// Read multiple properties from one or more objects on a remote device.
1135    pub async fn read_property_multiple(
1136        &self,
1137        destination_mac: &[u8],
1138        specs: Vec<bacnet_services::rpm::ReadAccessSpecification>,
1139    ) -> Result<bacnet_services::rpm::ReadPropertyMultipleACK, Error> {
1140        use bacnet_services::rpm::{ReadPropertyMultipleACK, ReadPropertyMultipleRequest};
1141
1142        let request = ReadPropertyMultipleRequest {
1143            list_of_read_access_specs: specs,
1144        };
1145        let mut buf = BytesMut::new();
1146        request.encode(&mut buf);
1147
1148        let response_data = self
1149            .confirmed_request(
1150                destination_mac,
1151                ConfirmedServiceChoice::READ_PROPERTY_MULTIPLE,
1152                &buf,
1153            )
1154            .await?;
1155
1156        ReadPropertyMultipleACK::decode(&response_data)
1157    }
1158
1159    /// Write multiple properties on one or more objects on a remote device.
1160    pub async fn write_property_multiple(
1161        &self,
1162        destination_mac: &[u8],
1163        specs: Vec<bacnet_services::wpm::WriteAccessSpecification>,
1164    ) -> Result<(), Error> {
1165        use bacnet_services::wpm::WritePropertyMultipleRequest;
1166
1167        let request = WritePropertyMultipleRequest {
1168            list_of_write_access_specs: specs,
1169        };
1170        let mut buf = BytesMut::new();
1171        request.encode(&mut buf);
1172
1173        let _ = self
1174            .confirmed_request(
1175                destination_mac,
1176                ConfirmedServiceChoice::WRITE_PROPERTY_MULTIPLE,
1177                &buf,
1178            )
1179            .await?;
1180
1181        Ok(())
1182    }
1183
1184    /// Send a WhoIs broadcast to discover devices.
1185    pub async fn who_is(
1186        &self,
1187        low_limit: Option<u32>,
1188        high_limit: Option<u32>,
1189    ) -> Result<(), Error> {
1190        use bacnet_services::who_is::WhoIsRequest;
1191
1192        let request = WhoIsRequest {
1193            low_limit,
1194            high_limit,
1195        };
1196        let mut buf = BytesMut::new();
1197        request.encode(&mut buf);
1198
1199        self.broadcast_global_unconfirmed(UnconfirmedServiceChoice::WHO_IS, &buf)
1200            .await
1201    }
1202
1203    /// Send a WhoHas broadcast to find an object by identifier or name.
1204    pub async fn who_has(
1205        &self,
1206        object: bacnet_services::who_has::WhoHasObject,
1207        low_limit: Option<u32>,
1208        high_limit: Option<u32>,
1209    ) -> Result<(), Error> {
1210        use bacnet_services::who_has::WhoHasRequest;
1211
1212        let request = WhoHasRequest {
1213            low_limit,
1214            high_limit,
1215            object,
1216        };
1217        let mut buf = BytesMut::new();
1218        request.encode(&mut buf)?;
1219
1220        self.broadcast_unconfirmed(UnconfirmedServiceChoice::WHO_HAS, &buf)
1221            .await
1222    }
1223
1224    /// Subscribe to COV notifications for an object on a remote device.
1225    pub async fn subscribe_cov(
1226        &self,
1227        destination_mac: &[u8],
1228        subscriber_process_identifier: u32,
1229        monitored_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1230        confirmed: bool,
1231        lifetime: Option<u32>,
1232    ) -> Result<(), Error> {
1233        use bacnet_services::cov::SubscribeCOVRequest;
1234
1235        let request = SubscribeCOVRequest {
1236            subscriber_process_identifier,
1237            monitored_object_identifier,
1238            issue_confirmed_notifications: Some(confirmed),
1239            lifetime,
1240        };
1241        let mut buf = BytesMut::new();
1242        request.encode(&mut buf);
1243
1244        let _ = self
1245            .confirmed_request(destination_mac, ConfirmedServiceChoice::SUBSCRIBE_COV, &buf)
1246            .await?;
1247
1248        Ok(())
1249    }
1250
1251    /// Cancel a COV subscription on a remote device.
1252    pub async fn unsubscribe_cov(
1253        &self,
1254        destination_mac: &[u8],
1255        subscriber_process_identifier: u32,
1256        monitored_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1257    ) -> Result<(), Error> {
1258        use bacnet_services::cov::SubscribeCOVRequest;
1259
1260        let request = SubscribeCOVRequest {
1261            subscriber_process_identifier,
1262            monitored_object_identifier,
1263            issue_confirmed_notifications: None,
1264            lifetime: None,
1265        };
1266        let mut buf = BytesMut::new();
1267        request.encode(&mut buf);
1268
1269        let _ = self
1270            .confirmed_request(destination_mac, ConfirmedServiceChoice::SUBSCRIBE_COV, &buf)
1271            .await?;
1272
1273        Ok(())
1274    }
1275
1276    /// Delete an object on a remote device.
1277    pub async fn delete_object(
1278        &self,
1279        destination_mac: &[u8],
1280        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1281    ) -> Result<(), Error> {
1282        use bacnet_services::object_mgmt::DeleteObjectRequest;
1283
1284        let request = DeleteObjectRequest { object_identifier };
1285        let mut buf = BytesMut::new();
1286        request.encode(&mut buf);
1287
1288        let _ = self
1289            .confirmed_request(destination_mac, ConfirmedServiceChoice::DELETE_OBJECT, &buf)
1290            .await?;
1291
1292        Ok(())
1293    }
1294
1295    /// Create an object on a remote device.
1296    pub async fn create_object(
1297        &self,
1298        destination_mac: &[u8],
1299        object_specifier: bacnet_services::object_mgmt::ObjectSpecifier,
1300        initial_values: Vec<bacnet_services::common::BACnetPropertyValue>,
1301    ) -> Result<Bytes, Error> {
1302        use bacnet_services::object_mgmt::CreateObjectRequest;
1303
1304        let request = CreateObjectRequest {
1305            object_specifier,
1306            list_of_initial_values: initial_values,
1307        };
1308        let mut buf = BytesMut::new();
1309        request.encode(&mut buf);
1310
1311        self.confirmed_request(destination_mac, ConfirmedServiceChoice::CREATE_OBJECT, &buf)
1312            .await
1313    }
1314
1315    /// Send DeviceCommunicationControl to a remote device.
1316    pub async fn device_communication_control(
1317        &self,
1318        destination_mac: &[u8],
1319        enable_disable: bacnet_types::enums::EnableDisable,
1320        time_duration: Option<u16>,
1321        password: Option<String>,
1322    ) -> Result<(), Error> {
1323        use bacnet_services::device_mgmt::DeviceCommunicationControlRequest;
1324
1325        let request = DeviceCommunicationControlRequest {
1326            time_duration,
1327            enable_disable,
1328            password,
1329        };
1330        let mut buf = BytesMut::new();
1331        request.encode(&mut buf)?;
1332
1333        let _ = self
1334            .confirmed_request(
1335                destination_mac,
1336                ConfirmedServiceChoice::DEVICE_COMMUNICATION_CONTROL,
1337                &buf,
1338            )
1339            .await?;
1340
1341        Ok(())
1342    }
1343
1344    /// Send ReinitializeDevice to a remote device.
1345    pub async fn reinitialize_device(
1346        &self,
1347        destination_mac: &[u8],
1348        reinitialized_state: bacnet_types::enums::ReinitializedState,
1349        password: Option<String>,
1350    ) -> Result<(), Error> {
1351        use bacnet_services::device_mgmt::ReinitializeDeviceRequest;
1352
1353        let request = ReinitializeDeviceRequest {
1354            reinitialized_state,
1355            password,
1356        };
1357        let mut buf = BytesMut::new();
1358        request.encode(&mut buf)?;
1359
1360        let _ = self
1361            .confirmed_request(
1362                destination_mac,
1363                ConfirmedServiceChoice::REINITIALIZE_DEVICE,
1364                &buf,
1365            )
1366            .await?;
1367
1368        Ok(())
1369    }
1370
1371    /// Get event information from a remote device.
1372    pub async fn get_event_information(
1373        &self,
1374        destination_mac: &[u8],
1375        last_received_object_identifier: Option<bacnet_types::primitives::ObjectIdentifier>,
1376    ) -> Result<Bytes, Error> {
1377        use bacnet_services::alarm_event::GetEventInformationRequest;
1378
1379        let request = GetEventInformationRequest {
1380            last_received_object_identifier,
1381        };
1382        let mut buf = BytesMut::new();
1383        request.encode(&mut buf);
1384
1385        self.confirmed_request(
1386            destination_mac,
1387            ConfirmedServiceChoice::GET_EVENT_INFORMATION,
1388            &buf,
1389        )
1390        .await
1391    }
1392
1393    /// Acknowledge an alarm on a remote device.
1394    pub async fn acknowledge_alarm(
1395        &self,
1396        destination_mac: &[u8],
1397        acknowledging_process_identifier: u32,
1398        event_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1399        event_state_acknowledged: u32,
1400        acknowledgment_source: &str,
1401    ) -> Result<(), Error> {
1402        use bacnet_services::alarm_event::AcknowledgeAlarmRequest;
1403
1404        let request = AcknowledgeAlarmRequest {
1405            acknowledging_process_identifier,
1406            event_object_identifier,
1407            event_state_acknowledged,
1408            timestamp: bacnet_types::primitives::BACnetTimeStamp::SequenceNumber(0),
1409            acknowledgment_source: acknowledgment_source.to_string(),
1410        };
1411        let mut buf = BytesMut::new();
1412        request.encode(&mut buf)?;
1413
1414        let _ = self
1415            .confirmed_request(
1416                destination_mac,
1417                ConfirmedServiceChoice::ACKNOWLEDGE_ALARM,
1418                &buf,
1419            )
1420            .await?;
1421
1422        Ok(())
1423    }
1424
1425    /// Read a range of items from a list or log-buffer property.
1426    pub async fn read_range(
1427        &self,
1428        destination_mac: &[u8],
1429        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1430        property_identifier: bacnet_types::enums::PropertyIdentifier,
1431        property_array_index: Option<u32>,
1432        range: Option<bacnet_services::read_range::RangeSpec>,
1433    ) -> Result<bacnet_services::read_range::ReadRangeAck, Error> {
1434        use bacnet_services::read_range::{ReadRangeAck, ReadRangeRequest};
1435
1436        let request = ReadRangeRequest {
1437            object_identifier,
1438            property_identifier,
1439            property_array_index,
1440            range,
1441        };
1442        let mut buf = BytesMut::new();
1443        request.encode(&mut buf);
1444
1445        let response_data = self
1446            .confirmed_request(destination_mac, ConfirmedServiceChoice::READ_RANGE, &buf)
1447            .await?;
1448
1449        ReadRangeAck::decode(&response_data)
1450    }
1451
1452    /// Read file data from a remote device (stream or record access).
1453    pub async fn atomic_read_file(
1454        &self,
1455        destination_mac: &[u8],
1456        file_identifier: bacnet_types::primitives::ObjectIdentifier,
1457        access: bacnet_services::file::FileAccessMethod,
1458    ) -> Result<Bytes, Error> {
1459        use bacnet_services::file::AtomicReadFileRequest;
1460
1461        let request = AtomicReadFileRequest {
1462            file_identifier,
1463            access,
1464        };
1465        let mut buf = BytesMut::new();
1466        request.encode(&mut buf);
1467
1468        self.confirmed_request(
1469            destination_mac,
1470            ConfirmedServiceChoice::ATOMIC_READ_FILE,
1471            &buf,
1472        )
1473        .await
1474    }
1475
1476    /// Write file data to a remote device (stream or record access).
1477    pub async fn atomic_write_file(
1478        &self,
1479        destination_mac: &[u8],
1480        file_identifier: bacnet_types::primitives::ObjectIdentifier,
1481        access: bacnet_services::file::FileWriteAccessMethod,
1482    ) -> Result<Bytes, Error> {
1483        use bacnet_services::file::AtomicWriteFileRequest;
1484
1485        let request = AtomicWriteFileRequest {
1486            file_identifier,
1487            access,
1488        };
1489        let mut buf = BytesMut::new();
1490        request.encode(&mut buf);
1491
1492        self.confirmed_request(
1493            destination_mac,
1494            ConfirmedServiceChoice::ATOMIC_WRITE_FILE,
1495            &buf,
1496        )
1497        .await
1498    }
1499
1500    /// Add elements to a list property on a remote device.
1501    pub async fn add_list_element(
1502        &self,
1503        destination_mac: &[u8],
1504        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1505        property_identifier: bacnet_types::enums::PropertyIdentifier,
1506        property_array_index: Option<u32>,
1507        list_of_elements: Vec<u8>,
1508    ) -> Result<(), Error> {
1509        use bacnet_services::list_manipulation::ListElementRequest;
1510
1511        let request = ListElementRequest {
1512            object_identifier,
1513            property_identifier,
1514            property_array_index,
1515            list_of_elements,
1516        };
1517        let mut buf = BytesMut::new();
1518        request.encode(&mut buf);
1519
1520        let _ = self
1521            .confirmed_request(
1522                destination_mac,
1523                ConfirmedServiceChoice::ADD_LIST_ELEMENT,
1524                &buf,
1525            )
1526            .await?;
1527
1528        Ok(())
1529    }
1530
1531    /// Remove elements from a list property on a remote device.
1532    pub async fn remove_list_element(
1533        &self,
1534        destination_mac: &[u8],
1535        object_identifier: bacnet_types::primitives::ObjectIdentifier,
1536        property_identifier: bacnet_types::enums::PropertyIdentifier,
1537        property_array_index: Option<u32>,
1538        list_of_elements: Vec<u8>,
1539    ) -> Result<(), Error> {
1540        use bacnet_services::list_manipulation::ListElementRequest;
1541
1542        let request = ListElementRequest {
1543            object_identifier,
1544            property_identifier,
1545            property_array_index,
1546            list_of_elements,
1547        };
1548        let mut buf = BytesMut::new();
1549        request.encode(&mut buf);
1550
1551        let _ = self
1552            .confirmed_request(
1553                destination_mac,
1554                ConfirmedServiceChoice::REMOVE_LIST_ELEMENT,
1555                &buf,
1556            )
1557            .await?;
1558
1559        Ok(())
1560    }
1561
1562    /// Get a receiver for incoming COV notifications.
1563    ///
1564    /// Can be called multiple times — each call returns a new independent
1565    /// receiver that gets all notifications from that point forward.
1566    pub fn cov_notifications(&self) -> broadcast::Receiver<COVNotificationRequest> {
1567        self.cov_tx.subscribe()
1568    }
1569
1570    // -----------------------------------------------------------------------
1571    // Device discovery
1572    // -----------------------------------------------------------------------
1573
1574    /// Get a snapshot of all discovered devices (from IAm responses).
1575    pub async fn discovered_devices(&self) -> Vec<DiscoveredDevice> {
1576        self.device_table.lock().await.all()
1577    }
1578
1579    /// Look up a discovered device by instance number.
1580    pub async fn get_device(&self, instance: u32) -> Option<DiscoveredDevice> {
1581        self.device_table.lock().await.get(instance).cloned()
1582    }
1583
1584    /// Clear the discovered devices table.
1585    pub async fn clear_devices(&self) {
1586        self.device_table.lock().await.clear();
1587    }
1588
1589    /// Stop the client, aborting the dispatch task.
1590    pub async fn stop(&mut self) -> Result<(), Error> {
1591        if let Some(task) = self.dispatch_task.take() {
1592            task.abort();
1593            let _ = task.await;
1594        }
1595        // Network/transport cleanup happens when the Arc is dropped.
1596        Ok(())
1597    }
1598}
1599
1600#[cfg(test)]
1601mod tests {
1602    use super::*;
1603    use bacnet_encoding::apdu::{ComplexAck, SimpleAck};
1604    use std::net::Ipv4Addr;
1605    use tokio::time::Duration;
1606
1607    /// Helper: build a client on loopback with ephemeral port and short timeout.
1608    async fn make_client() -> BACnetClient<BipTransport> {
1609        BACnetClient::builder()
1610            .interface(Ipv4Addr::LOCALHOST)
1611            .port(0)
1612            .apdu_timeout_ms(2000)
1613            .build()
1614            .await
1615            .unwrap()
1616    }
1617
1618    #[tokio::test]
1619    async fn client_start_stop() {
1620        let mut client = make_client().await;
1621        assert!(!client.local_mac().is_empty());
1622        client.stop().await.unwrap();
1623    }
1624
1625    #[tokio::test]
1626    async fn confirmed_request_simple_ack() {
1627        let mut client_a = make_client().await;
1628
1629        // Create a second network layer to act as "server B"
1630        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1631        let mut net_b = NetworkLayer::new(transport_b);
1632        let mut rx_b = net_b.start().await.unwrap();
1633        let b_mac = net_b.local_mac().to_vec();
1634
1635        // Spawn a task that receives the request and sends back SimpleAck
1636        let b_handle = tokio::spawn(async move {
1637            let received = timeout(Duration::from_secs(2), rx_b.recv())
1638                .await
1639                .expect("B timed out")
1640                .expect("B channel closed");
1641
1642            let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1643            if let Apdu::ConfirmedRequest(req) = decoded {
1644                let ack = Apdu::SimpleAck(SimpleAck {
1645                    invoke_id: req.invoke_id,
1646                    service_choice: req.service_choice,
1647                });
1648                let mut buf = BytesMut::new();
1649                encode_apdu(&mut buf, &ack);
1650                net_b
1651                    .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1652                    .await
1653                    .unwrap();
1654            }
1655            net_b.stop().await.unwrap();
1656        });
1657
1658        let result = client_a
1659            .confirmed_request(
1660                &b_mac,
1661                ConfirmedServiceChoice::WRITE_PROPERTY,
1662                &[0x01, 0x02],
1663            )
1664            .await;
1665
1666        assert!(result.is_ok());
1667        let response = result.unwrap();
1668        assert!(response.is_empty()); // SimpleAck has no service data
1669
1670        b_handle.await.unwrap();
1671        client_a.stop().await.unwrap();
1672    }
1673
1674    #[tokio::test]
1675    async fn confirmed_request_complex_ack() {
1676        let mut client_a = make_client().await;
1677
1678        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1679        let mut net_b = NetworkLayer::new(transport_b);
1680        let mut rx_b = net_b.start().await.unwrap();
1681        let b_mac = net_b.local_mac().to_vec();
1682
1683        let b_handle = tokio::spawn(async move {
1684            let received = timeout(Duration::from_secs(2), rx_b.recv())
1685                .await
1686                .unwrap()
1687                .unwrap();
1688
1689            let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1690            if let Apdu::ConfirmedRequest(req) = decoded {
1691                let ack = Apdu::ComplexAck(ComplexAck {
1692                    segmented: false,
1693                    more_follows: false,
1694                    invoke_id: req.invoke_id,
1695                    sequence_number: None,
1696                    proposed_window_size: None,
1697                    service_choice: req.service_choice,
1698                    service_ack: Bytes::from_static(&[0xDE, 0xAD, 0xBE, 0xEF]),
1699                });
1700                let mut buf = BytesMut::new();
1701                encode_apdu(&mut buf, &ack);
1702                net_b
1703                    .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1704                    .await
1705                    .unwrap();
1706            }
1707            net_b.stop().await.unwrap();
1708        });
1709
1710        let result = client_a
1711            .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
1712            .await;
1713
1714        assert!(result.is_ok());
1715        assert_eq!(result.unwrap(), vec![0xDE, 0xAD, 0xBE, 0xEF]);
1716
1717        b_handle.await.unwrap();
1718        client_a.stop().await.unwrap();
1719    }
1720
1721    #[tokio::test]
1722    async fn confirmed_request_timeout() {
1723        let mut client = make_client().await;
1724        // Send to a non-existent address — should timeout
1725        let fake_mac = vec![10, 99, 99, 99, 0xBA, 0xC0];
1726        let result = client
1727            .confirmed_request(&fake_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
1728            .await;
1729        assert!(result.is_err());
1730        client.stop().await.unwrap();
1731    }
1732
1733    #[tokio::test]
1734    async fn segmented_complex_ack_reassembly() {
1735        let mut client = make_client().await;
1736
1737        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1738        let mut net_b = NetworkLayer::new(transport_b);
1739        let mut rx_b = net_b.start().await.unwrap();
1740        let b_mac = net_b.local_mac().to_vec();
1741
1742        // Server B: receive request, respond with 3-segment ComplexAck.
1743        // Wait for SegmentAck after each segment before sending the next.
1744        let b_handle = tokio::spawn(async move {
1745            // Receive the ConfirmedRequest
1746            let received = timeout(Duration::from_secs(2), rx_b.recv())
1747                .await
1748                .unwrap()
1749                .unwrap();
1750
1751            let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1752            let invoke_id = if let Apdu::ConfirmedRequest(req) = decoded {
1753                req.invoke_id
1754            } else {
1755                panic!("Expected ConfirmedRequest");
1756            };
1757
1758            let service_choice = ConfirmedServiceChoice::READ_PROPERTY;
1759            let segments: Vec<Bytes> = vec![
1760                Bytes::from_static(&[0x01, 0x02, 0x03]),
1761                Bytes::from_static(&[0x04, 0x05, 0x06]),
1762                Bytes::from_static(&[0x07, 0x08]),
1763            ];
1764
1765            for (i, seg) in segments.iter().enumerate() {
1766                let is_last = i == segments.len() - 1;
1767                let ack = Apdu::ComplexAck(ComplexAck {
1768                    segmented: true,
1769                    more_follows: !is_last,
1770                    invoke_id,
1771                    sequence_number: Some(i as u8),
1772                    proposed_window_size: Some(1),
1773                    service_choice,
1774                    service_ack: seg.clone(),
1775                });
1776                let mut buf = BytesMut::new();
1777                encode_apdu(&mut buf, &ack);
1778                net_b
1779                    .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1780                    .await
1781                    .unwrap();
1782
1783                // Wait for SegmentAck from client
1784                let seg_ack_msg = timeout(Duration::from_secs(2), rx_b.recv())
1785                    .await
1786                    .unwrap()
1787                    .unwrap();
1788                let decoded = apdu::decode_apdu(seg_ack_msg.apdu.clone()).unwrap();
1789                if let Apdu::SegmentAck(sa) = decoded {
1790                    assert_eq!(sa.invoke_id, invoke_id);
1791                    assert_eq!(sa.sequence_number, i as u8);
1792                } else {
1793                    panic!("Expected SegmentAck, got {:?}", decoded);
1794                }
1795            }
1796
1797            net_b.stop().await.unwrap();
1798        });
1799
1800        // Client sends a request and should receive the reassembled response
1801        let result = client
1802            .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
1803            .await;
1804
1805        assert!(result.is_ok());
1806        assert_eq!(
1807            result.unwrap(),
1808            vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
1809        );
1810
1811        b_handle.await.unwrap();
1812        client.stop().await.unwrap();
1813    }
1814
1815    #[tokio::test]
1816    async fn segmented_confirmed_request_sends_segments() {
1817        // Client with max_apdu_length=50 → max segment payload = 44 bytes.
1818        // Any service_data > 46 bytes will trigger segmentation.
1819        let mut client = BACnetClient::builder()
1820            .interface(Ipv4Addr::LOCALHOST)
1821            .port(0)
1822            .apdu_timeout_ms(5000)
1823            .max_apdu_length(50)
1824            .build()
1825            .await
1826            .unwrap();
1827
1828        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1829        let mut net_b = NetworkLayer::new(transport_b);
1830        let mut rx_b = net_b.start().await.unwrap();
1831        let b_mac = net_b.local_mac().to_vec();
1832
1833        // 100 bytes of service data → ceil(100/44) = 3 segments (44 + 44 + 12)
1834        let service_data: Vec<u8> = (0u8..100).collect();
1835        let expected_data = service_data.clone();
1836
1837        let b_handle = tokio::spawn(async move {
1838            let mut all_service_data = Vec::new();
1839            let mut client_mac;
1840            let mut invoke_id;
1841
1842            loop {
1843                let received = timeout(Duration::from_secs(3), rx_b.recv())
1844                    .await
1845                    .expect("server timed out waiting for segment")
1846                    .expect("server channel closed");
1847
1848                let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1849                if let Apdu::ConfirmedRequest(req) = decoded {
1850                    assert!(req.segmented, "expected segmented request");
1851                    invoke_id = req.invoke_id;
1852                    client_mac = received.source_mac.clone();
1853                    let seq = req.sequence_number.unwrap();
1854                    all_service_data.extend_from_slice(&req.service_request);
1855
1856                    // Send SegmentAck
1857                    let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
1858                        negative_ack: false,
1859                        sent_by_server: true,
1860                        invoke_id,
1861                        sequence_number: seq,
1862                        actual_window_size: 1,
1863                    });
1864                    let mut buf = BytesMut::new();
1865                    encode_apdu(&mut buf, &seg_ack);
1866                    net_b
1867                        .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1868                        .await
1869                        .unwrap();
1870
1871                    if !req.more_follows {
1872                        break;
1873                    }
1874                } else {
1875                    panic!("Expected ConfirmedRequest, got {:?}", decoded);
1876                }
1877            }
1878
1879            // All segments received — send SimpleAck
1880            let ack = Apdu::SimpleAck(SimpleAck {
1881                invoke_id,
1882                service_choice: ConfirmedServiceChoice::WRITE_PROPERTY,
1883            });
1884            let mut buf = BytesMut::new();
1885            encode_apdu(&mut buf, &ack);
1886            net_b
1887                .send_apdu(&buf, &client_mac, false, NetworkPriority::NORMAL)
1888                .await
1889                .unwrap();
1890
1891            net_b.stop().await.unwrap();
1892            all_service_data
1893        });
1894
1895        let result = client
1896            .confirmed_request(
1897                &b_mac,
1898                ConfirmedServiceChoice::WRITE_PROPERTY,
1899                &service_data,
1900            )
1901            .await;
1902
1903        assert!(result.is_ok());
1904        assert!(result.unwrap().is_empty()); // SimpleAck has no service data
1905
1906        // Verify server received all service data correctly
1907        let received_data = b_handle.await.unwrap();
1908        assert_eq!(received_data, expected_data);
1909
1910        client.stop().await.unwrap();
1911    }
1912
1913    #[tokio::test]
1914    async fn segmented_request_with_complex_ack_response() {
1915        let mut client = BACnetClient::builder()
1916            .interface(Ipv4Addr::LOCALHOST)
1917            .port(0)
1918            .apdu_timeout_ms(5000)
1919            .max_apdu_length(50)
1920            .build()
1921            .await
1922            .unwrap();
1923
1924        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1925        let mut net_b = NetworkLayer::new(transport_b);
1926        let mut rx_b = net_b.start().await.unwrap();
1927        let b_mac = net_b.local_mac().to_vec();
1928
1929        // 60 bytes → 2 segments (44 + 16)
1930        let service_data: Vec<u8> = (0u8..60).collect();
1931
1932        let b_handle = tokio::spawn(async move {
1933            let mut client_mac;
1934            let mut invoke_id;
1935
1936            loop {
1937                let received = timeout(Duration::from_secs(3), rx_b.recv())
1938                    .await
1939                    .unwrap()
1940                    .unwrap();
1941
1942                let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1943                if let Apdu::ConfirmedRequest(req) = decoded {
1944                    invoke_id = req.invoke_id;
1945                    client_mac = received.source_mac.clone();
1946                    let seq = req.sequence_number.unwrap();
1947
1948                    let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
1949                        negative_ack: false,
1950                        sent_by_server: true,
1951                        invoke_id,
1952                        sequence_number: seq,
1953                        actual_window_size: 1,
1954                    });
1955                    let mut buf = BytesMut::new();
1956                    encode_apdu(&mut buf, &seg_ack);
1957                    net_b
1958                        .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1959                        .await
1960                        .unwrap();
1961
1962                    if !req.more_follows {
1963                        break;
1964                    }
1965                }
1966            }
1967
1968            // Send ComplexAck response
1969            let ack = Apdu::ComplexAck(ComplexAck {
1970                segmented: false,
1971                more_follows: false,
1972                invoke_id,
1973                sequence_number: None,
1974                proposed_window_size: None,
1975                service_choice: ConfirmedServiceChoice::READ_PROPERTY,
1976                service_ack: Bytes::from_static(&[0xCA, 0xFE]),
1977            });
1978            let mut buf = BytesMut::new();
1979            encode_apdu(&mut buf, &ack);
1980            net_b
1981                .send_apdu(&buf, &client_mac, false, NetworkPriority::NORMAL)
1982                .await
1983                .unwrap();
1984
1985            net_b.stop().await.unwrap();
1986        });
1987
1988        let result = client
1989            .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &service_data)
1990            .await;
1991
1992        assert!(result.is_ok());
1993        assert_eq!(result.unwrap(), vec![0xCA, 0xFE]);
1994
1995        b_handle.await.unwrap();
1996        client.stop().await.unwrap();
1997    }
1998
1999    #[tokio::test]
2000    async fn segment_overflow_guard() {
2001        // Create a client with small max_apdu_length so segments are tiny.
2002        let mut client = BACnetClient::builder()
2003            .interface(Ipv4Addr::LOCALHOST)
2004            .port(0)
2005            .apdu_timeout_ms(2000)
2006            .max_apdu_length(50)
2007            .build()
2008            .await
2009            .unwrap();
2010
2011        // With max_apdu=50, each segment carries 50-6=44 bytes.
2012        // 256 * 44 = 11,264 bytes → 256 segments, which exceeds the u8 limit.
2013        let huge_payload = vec![0u8; 256 * 44];
2014
2015        // Use a fake destination MAC not in the device table — the client
2016        // will fall back to its own max_apdu_length (50), triggering segmentation.
2017        let fake_mac = vec![10, 99, 99, 99, 0xBA, 0xC0];
2018
2019        let result = client
2020            .confirmed_request(
2021                &fake_mac,
2022                ConfirmedServiceChoice::READ_PROPERTY,
2023                &huge_payload,
2024            )
2025            .await;
2026
2027        assert!(result.is_err());
2028        let err_msg = result.unwrap_err().to_string();
2029        assert!(
2030            err_msg.contains("256 segments"),
2031            "expected segment overflow error, got: {}",
2032            err_msg
2033        );
2034
2035        client.stop().await.unwrap();
2036    }
2037
2038    #[test]
2039    fn seg_receiver_timeout_is_4s() {
2040        assert_eq!(SEG_RECEIVER_TIMEOUT, Duration::from_secs(4));
2041    }
2042}