bacnet_client/client/
lifecycle.rs1use super::*;
2
3impl<T: TransportPort + 'static> BACnetClient<T> {
4 pub async fn start(mut config: ClientConfig, transport: T) -> Result<Self, Error> {
6 let transport_max = transport.max_apdu_length();
7 config.max_apdu_length = config.max_apdu_length.min(transport_max);
8 validate_max_apdu_length(config.max_apdu_length)?;
9 if !(1..=127).contains(&config.proposed_window_size) {
10 return Err(Error::Encoding(format!(
11 "invalid proposed-window-size {}; expected 1..=127",
12 config.proposed_window_size
13 )));
14 }
15
16 let mut network = NetworkLayer::new(transport);
17 let mut apdu_rx = network.start().await?;
18 let local_mac = MacAddr::from_slice(network.local_mac());
19
20 let network = Arc::new(network);
21
22 let tsm_config = TsmConfig {
23 apdu_timeout_ms: config.apdu_timeout_ms,
24 apdu_segment_timeout_ms: config.apdu_timeout_ms,
25 apdu_retries: config.apdu_retries,
26 };
27 let tsm = Arc::new(Mutex::new(Tsm::new(tsm_config)));
28 let tsm_dispatch = Arc::clone(&tsm);
29 let device_table = Arc::new(Mutex::new(DeviceTable::new()));
30 let device_table_dispatch = Arc::clone(&device_table);
31 let network_dispatch = Arc::clone(&network);
32 let (cov_tx, _) = broadcast::channel::<COVNotificationRequest>(64);
33 let cov_tx_dispatch = cov_tx.clone();
34 let seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>> =
35 Arc::new(Mutex::new(HashMap::new()));
36 let seg_ack_senders_dispatch = Arc::clone(&seg_ack_senders);
37 let segmented_response_accepted = config.segmented_response_accepted;
38
39 let dispatch_task = tokio::spawn(async move {
40 let mut seg_state: HashMap<SegKey, SegmentedReceiveState> = HashMap::new();
41 let mut last_device_purge = Instant::now();
42 const DEVICE_PURGE_INTERVAL: Duration = Duration::from_secs(300);
43 const DEVICE_MAX_AGE: Duration = Duration::from_secs(600);
44
45 while let Some(received) = apdu_rx.recv().await {
46 let now = Instant::now();
47
48 if now.duration_since(last_device_purge) >= DEVICE_PURGE_INTERVAL {
50 device_table_dispatch
51 .lock()
52 .await
53 .purge_stale(DEVICE_MAX_AGE);
54 last_device_purge = now;
55 }
56 let stale_keys: Vec<SegKey> = seg_state
58 .iter()
59 .filter(|(_, state)| {
60 now.duration_since(state.last_activity) >= SEG_RECEIVER_TIMEOUT
61 })
62 .map(|(key, _)| key.clone())
63 .collect();
64 for key in &stale_keys {
65 if let Some(state) = seg_state.remove(key) {
66 let abort = Apdu::Abort(AbortPdu {
67 sent_by_server: false,
68 invoke_id: key.1,
69 abort_reason: bacnet_types::enums::AbortReason::TSM_TIMEOUT,
70 });
71 let mut buf = BytesMut::with_capacity(4);
72 if let Err(e) = encode_apdu(&mut buf, &abort) {
73 warn!(error = %e, "Failed to encode segmented receive timeout Abort");
74 continue;
75 }
76 let _ = network_dispatch
77 .send_apdu(&buf, &state.reply_mac, false, NetworkPriority::NORMAL)
78 .await;
79 }
80 }
81
82 match apdu::decode_apdu(received.apdu.clone()) {
83 Ok(decoded) => {
84 Self::dispatch_apdu(
85 &tsm_dispatch,
86 &device_table_dispatch,
87 &network_dispatch,
88 &cov_tx_dispatch,
89 &mut seg_state,
90 &seg_ack_senders_dispatch,
91 &received.source_mac,
92 &received.source_network,
93 decoded,
94 segmented_response_accepted,
95 )
96 .await;
97 }
98 Err(e) => {
99 warn!(error = %e, "Failed to decode received APDU");
100 }
101 }
102 }
103 });
104
105 Ok(Self {
106 config,
107 network,
108 tsm,
109 device_table,
110 cov_tx,
111 dispatch_task: Some(dispatch_task),
112 seg_ack_senders,
113 local_mac,
114 })
115 }
116 pub fn local_mac(&self) -> &[u8] {
118 &self.local_mac
119 }
120 pub async fn stop(&mut self) -> Result<(), Error> {
122 if let Some(task) = self.dispatch_task.take() {
123 task.abort();
124 let _ = task.await;
125 }
126 Ok(())
127 }
128}