Skip to main content

bacnet_client/client/
lifecycle.rs

1use super::*;
2
3impl<T: TransportPort + 'static> BACnetClient<T> {
4    /// Start the client: bind transport, start network layer, spawn dispatch.
5    pub async fn start(mut config: ClientConfig, transport: T) -> Result<Self, Error> {
6        let transport_max = transport.max_apdu_length();
7        config.max_apdu_length = config.max_apdu_length.min(transport_max);
8        validate_max_apdu_length(config.max_apdu_length)?;
9        if !(1..=127).contains(&config.proposed_window_size) {
10            return Err(Error::Encoding(format!(
11                "invalid proposed-window-size {}; expected 1..=127",
12                config.proposed_window_size
13            )));
14        }
15
16        let mut network = NetworkLayer::new(transport);
17        let mut apdu_rx = network.start().await?;
18        let local_mac = MacAddr::from_slice(network.local_mac());
19
20        let network = Arc::new(network);
21
22        let tsm_config = TsmConfig {
23            apdu_timeout_ms: config.apdu_timeout_ms,
24            apdu_segment_timeout_ms: config.apdu_timeout_ms,
25            apdu_retries: config.apdu_retries,
26        };
27        let tsm = Arc::new(Mutex::new(Tsm::new(tsm_config)));
28        let tsm_dispatch = Arc::clone(&tsm);
29        let device_table = Arc::new(Mutex::new(DeviceTable::new()));
30        let device_table_dispatch = Arc::clone(&device_table);
31        let network_dispatch = Arc::clone(&network);
32        let (cov_tx, _) = broadcast::channel::<COVNotificationRequest>(64);
33        let cov_tx_dispatch = cov_tx.clone();
34        let seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>> =
35            Arc::new(Mutex::new(HashMap::new()));
36        let seg_ack_senders_dispatch = Arc::clone(&seg_ack_senders);
37        let segmented_response_accepted = config.segmented_response_accepted;
38
39        let dispatch_task = tokio::spawn(async move {
40            let mut seg_state: HashMap<SegKey, SegmentedReceiveState> = HashMap::new();
41            let mut last_device_purge = Instant::now();
42            const DEVICE_PURGE_INTERVAL: Duration = Duration::from_secs(300);
43            const DEVICE_MAX_AGE: Duration = Duration::from_secs(600);
44
45            while let Some(received) = apdu_rx.recv().await {
46                let now = Instant::now();
47
48                // Periodically purge stale device table entries
49                if now.duration_since(last_device_purge) >= DEVICE_PURGE_INTERVAL {
50                    device_table_dispatch
51                        .lock()
52                        .await
53                        .purge_stale(DEVICE_MAX_AGE);
54                    last_device_purge = now;
55                }
56                // Reap stale segmented sessions and send Abort to the server
57                let stale_keys: Vec<SegKey> = seg_state
58                    .iter()
59                    .filter(|(_, state)| {
60                        now.duration_since(state.last_activity) >= SEG_RECEIVER_TIMEOUT
61                    })
62                    .map(|(key, _)| key.clone())
63                    .collect();
64                for key in &stale_keys {
65                    if let Some(state) = seg_state.remove(key) {
66                        let abort = Apdu::Abort(AbortPdu {
67                            sent_by_server: false,
68                            invoke_id: key.1,
69                            abort_reason: bacnet_types::enums::AbortReason::TSM_TIMEOUT,
70                        });
71                        let mut buf = BytesMut::with_capacity(4);
72                        if let Err(e) = encode_apdu(&mut buf, &abort) {
73                            warn!(error = %e, "Failed to encode segmented receive timeout Abort");
74                            continue;
75                        }
76                        let _ = network_dispatch
77                            .send_apdu(&buf, &state.reply_mac, false, NetworkPriority::NORMAL)
78                            .await;
79                    }
80                }
81
82                match apdu::decode_apdu(received.apdu.clone()) {
83                    Ok(decoded) => {
84                        Self::dispatch_apdu(
85                            &tsm_dispatch,
86                            &device_table_dispatch,
87                            &network_dispatch,
88                            &cov_tx_dispatch,
89                            &mut seg_state,
90                            &seg_ack_senders_dispatch,
91                            &received.source_mac,
92                            &received.source_network,
93                            decoded,
94                            segmented_response_accepted,
95                        )
96                        .await;
97                    }
98                    Err(e) => {
99                        warn!(error = %e, "Failed to decode received APDU");
100                    }
101                }
102            }
103        });
104
105        Ok(Self {
106            config,
107            network,
108            tsm,
109            device_table,
110            cov_tx,
111            dispatch_task: Some(dispatch_task),
112            seg_ack_senders,
113            local_mac,
114        })
115    }
116    /// Get the client's local MAC address.
117    pub fn local_mac(&self) -> &[u8] {
118        &self.local_mac
119    }
120    /// Stop the client, aborting the dispatch task.
121    pub async fn stop(&mut self) -> Result<(), Error> {
122        if let Some(task) = self.dispatch_task.take() {
123            task.abort();
124            let _ = task.await;
125        }
126        Ok(())
127    }
128}