use super::*;
impl<T: TransportPort + 'static> BACnetClient<T> {
pub async fn start(mut config: ClientConfig, transport: T) -> Result<Self, Error> {
let transport_max = transport.max_apdu_length();
config.max_apdu_length = config.max_apdu_length.min(transport_max);
validate_max_apdu_length(config.max_apdu_length)?;
if !(1..=127).contains(&config.proposed_window_size) {
return Err(Error::Encoding(format!(
"invalid proposed-window-size {}; expected 1..=127",
config.proposed_window_size
)));
}
let mut network = NetworkLayer::new(transport);
let mut apdu_rx = network.start().await?;
let local_mac = MacAddr::from_slice(network.local_mac());
let network = Arc::new(network);
let tsm_config = TsmConfig {
apdu_timeout_ms: config.apdu_timeout_ms,
apdu_segment_timeout_ms: config.apdu_timeout_ms,
apdu_retries: config.apdu_retries,
};
let tsm = Arc::new(Mutex::new(Tsm::new(tsm_config)));
let tsm_dispatch = Arc::clone(&tsm);
let device_table = Arc::new(Mutex::new(DeviceTable::new()));
let device_table_dispatch = Arc::clone(&device_table);
let network_dispatch = Arc::clone(&network);
let (cov_tx, _) = broadcast::channel::<COVNotificationRequest>(64);
let cov_tx_dispatch = cov_tx.clone();
let seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>> =
Arc::new(Mutex::new(HashMap::new()));
let seg_ack_senders_dispatch = Arc::clone(&seg_ack_senders);
let segmented_response_accepted = config.segmented_response_accepted;
let dispatch_task = tokio::spawn(async move {
let mut seg_state: HashMap<SegKey, SegmentedReceiveState> = HashMap::new();
let mut last_device_purge = Instant::now();
const DEVICE_PURGE_INTERVAL: Duration = Duration::from_secs(300);
const DEVICE_MAX_AGE: Duration = Duration::from_secs(600);
while let Some(received) = apdu_rx.recv().await {
let now = Instant::now();
if now.duration_since(last_device_purge) >= DEVICE_PURGE_INTERVAL {
device_table_dispatch
.lock()
.await
.purge_stale(DEVICE_MAX_AGE);
last_device_purge = now;
}
let stale_keys: Vec<SegKey> = seg_state
.iter()
.filter(|(_, state)| {
now.duration_since(state.last_activity) >= SEG_RECEIVER_TIMEOUT
})
.map(|(key, _)| key.clone())
.collect();
for key in &stale_keys {
if let Some(state) = seg_state.remove(key) {
let abort = Apdu::Abort(AbortPdu {
sent_by_server: false,
invoke_id: key.1,
abort_reason: bacnet_types::enums::AbortReason::TSM_TIMEOUT,
});
let mut buf = BytesMut::with_capacity(4);
if let Err(e) = encode_apdu(&mut buf, &abort) {
warn!(error = %e, "Failed to encode segmented receive timeout Abort");
continue;
}
let _ = network_dispatch
.send_apdu(&buf, &state.reply_mac, false, NetworkPriority::NORMAL)
.await;
}
}
match apdu::decode_apdu(received.apdu.clone()) {
Ok(decoded) => {
Self::dispatch_apdu(
&tsm_dispatch,
&device_table_dispatch,
&network_dispatch,
&cov_tx_dispatch,
&mut seg_state,
&seg_ack_senders_dispatch,
&received.source_mac,
&received.source_network,
decoded,
segmented_response_accepted,
)
.await;
}
Err(e) => {
warn!(error = %e, "Failed to decode received APDU");
}
}
}
});
Ok(Self {
config,
network,
tsm,
device_table,
cov_tx,
dispatch_task: Some(dispatch_task),
seg_ack_senders,
local_mac,
})
}
pub fn local_mac(&self) -> &[u8] {
&self.local_mac
}
pub async fn stop(&mut self) -> Result<(), Error> {
if let Some(task) = self.dispatch_task.take() {
task.abort();
let _ = task.await;
}
Ok(())
}
}