use super::*;
impl<T: TransportPort + 'static> BACnetClient<T> {
pub(super) async fn handle_segmented_complex_ack(
tsm: &Arc<Mutex<Tsm>>,
network: &Arc<NetworkLayer<T>>,
seg_state: &mut HashMap<SegKey, SegmentedReceiveState>,
source_mac: &[u8],
source_network: &Option<NpduAddress>,
ack: bacnet_encoding::apdu::ComplexAck,
segmented_response_accepted: bool,
) {
let seq = ack.sequence_number.unwrap_or(0);
let tsm_mac = response_tsm_mac(source_mac, source_network);
let key = (tsm_mac.clone(), ack.invoke_id);
debug!(
invoke_id = ack.invoke_id,
seq = seq,
more = ack.more_follows,
"Received segmented ComplexAck"
);
if !segmented_response_accepted {
let abort = Apdu::Abort(AbortPdu {
sent_by_server: false,
invoke_id: ack.invoke_id,
abort_reason: bacnet_types::enums::AbortReason::SEGMENTATION_NOT_SUPPORTED,
});
let mut buf = BytesMut::with_capacity(4);
if let Err(e) = encode_apdu(&mut buf, &abort) {
warn!(error = %e, "Failed to encode segmentation-not-supported Abort");
return;
}
let _ = network
.send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
.await;
return;
}
const MAX_CONCURRENT_SEG_SESSIONS: usize = 64;
if !seg_state.contains_key(&key) && seg_state.len() >= MAX_CONCURRENT_SEG_SESSIONS {
warn!(
invoke_id = ack.invoke_id,
sessions = seg_state.len(),
"Max concurrent segmented sessions reached, dropping segment"
);
return;
}
let proposed_ws = ack.proposed_window_size.unwrap_or(1);
let state = seg_state
.entry(key.clone())
.or_insert_with(|| SegmentedReceiveState {
receiver: SegmentReceiver::new(),
reply_mac: MacAddr::from_slice(source_mac),
expected_next_seq: 0,
last_activity: Instant::now(),
window_position: 0,
proposed_window_size: proposed_ws,
});
state.last_activity = Instant::now();
if seq != state.expected_next_seq {
if seq < state.expected_next_seq {
debug!(
invoke_id = ack.invoke_id,
seq, "Discarding duplicate segment"
);
} else {
warn!(
invoke_id = ack.invoke_id,
expected = state.expected_next_seq,
received = seq,
"Segment gap detected, sending negative SegmentAck"
);
}
let neg_ack = Apdu::SegmentAck(SegmentAckPdu {
negative_ack: seq >= state.expected_next_seq,
sent_by_server: false,
invoke_id: ack.invoke_id,
sequence_number: if state.expected_next_seq > 0 {
state.expected_next_seq.wrapping_sub(1)
} else {
0
},
actual_window_size: proposed_ws,
});
let mut buf = BytesMut::with_capacity(4);
if let Err(e) = encode_apdu(&mut buf, &neg_ack) {
warn!(error = %e, "Failed to encode negative SegmentAck");
return;
}
if let Err(e) = network
.send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
.await
{
warn!(error = %e, "Failed to send SegmentAck");
}
return;
}
if let Err(e) = state.receiver.receive(seq, ack.service_ack) {
warn!(error = %e, "Rejecting oversized segment");
return;
}
state.expected_next_seq = seq.wrapping_add(1);
state.window_position += 1;
let should_ack = !ack.more_follows || state.window_position >= state.proposed_window_size;
if should_ack {
state.window_position = 0;
let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
negative_ack: false,
sent_by_server: false,
invoke_id: ack.invoke_id,
sequence_number: seq,
actual_window_size: proposed_ws,
});
let mut buf = BytesMut::with_capacity(4);
if let Err(e) = encode_apdu(&mut buf, &seg_ack) {
warn!(error = %e, "Failed to encode SegmentAck");
return;
}
if let Err(e) = network
.send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
.await
{
warn!(error = %e, "Failed to send SegmentAck");
}
}
if !ack.more_follows {
let state = seg_state.remove(&key).unwrap();
let total = state.receiver.received_count();
match state.receiver.reassemble(total) {
Ok(service_data) => {
debug!(
invoke_id = ack.invoke_id,
segments = total,
bytes = service_data.len(),
"Reassembled segmented ComplexAck"
);
let mut tsm = tsm.lock().await;
tsm.complete_transaction(
&tsm_mac,
ack.invoke_id,
TsmResponse::ComplexAck {
service_data: Bytes::from(service_data),
},
);
}
Err(e) => {
warn!(error = %e, "Failed to reassemble segmented ComplexAck");
}
}
}
}
pub(super) async fn segmented_confirmed_request(
&self,
target: ConfirmedTarget<'_>,
service_choice: ConfirmedServiceChoice,
service_data: &[u8],
remote_max_apdu: u16,
remote_max_segments: Option<u32>,
) -> Result<Bytes, Error> {
let tsm_mac = target.tsm_mac();
let max_seg_size = max_segment_payload(remote_max_apdu, SegmentedPduType::ConfirmedRequest);
let segments = split_payload(service_data, max_seg_size)?;
let total_segments = segments.len();
if let Some(max_seg) = remote_max_segments {
if total_segments > max_seg as usize {
return Err(Error::Segmentation(format!(
"request requires {} segments but remote accepts at most {}",
total_segments, max_seg
)));
}
}
debug!(
total_segments,
max_seg_size,
service_data_len = service_data.len(),
"Starting segmented confirmed request"
);
let (invoke_id, rx) = {
let mut tsm = self.tsm.lock().await;
let invoke_id = tsm.allocate_invoke_id(&tsm_mac).ok_or_else(|| {
Error::Encoding("all invoke IDs exhausted for destination".into())
})?;
let rx = tsm.register_transaction(tsm_mac.clone(), invoke_id);
(invoke_id, rx)
};
let (seg_ack_tx, mut seg_ack_rx) = mpsc::channel(16);
{
let key = (tsm_mac.clone(), invoke_id);
self.seg_ack_senders.lock().await.insert(key, seg_ack_tx);
}
let timeout_duration = Duration::from_millis(self.config.apdu_timeout_ms);
let max_ack_retries = self.config.apdu_retries;
let mut window_size = self.config.proposed_window_size.max(1) as usize;
let mut next_seq: usize = 0;
let mut neg_ack_retries: u32 = 0;
const MAX_NEG_ACK_RETRIES: u32 = 10;
let result = async {
while next_seq < total_segments {
let window_end = (next_seq + window_size).min(total_segments);
for (seq, segment_data) in segments[next_seq..window_end]
.iter()
.enumerate()
.map(|(i, s)| (next_seq + i, s))
{
let is_last = seq == total_segments - 1;
let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
segmented: true,
more_follows: !is_last,
segmented_response_accepted: self.config.segmented_response_accepted,
max_segments: self.config.max_segments,
max_apdu_length: self.config.max_apdu_length,
invoke_id,
sequence_number: Some(seq as u8),
proposed_window_size: Some(self.config.proposed_window_size.max(1)),
service_choice,
service_request: segment_data.clone(),
});
let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
encode_apdu(&mut buf, &pdu)?;
self.send_confirmed_target_apdu(target, &buf).await?;
debug!(seq, is_last, "Sent segment");
}
let ack = {
let mut ack_retries: u8 = 0;
loop {
match timeout(timeout_duration, seg_ack_rx.recv()).await {
Ok(Some(ack)) => break ack,
Ok(None) => {
return Err(Error::Encoding("SegmentAck channel closed".into()));
}
Err(_timeout) => {
ack_retries += 1;
if ack_retries > max_ack_retries {
return Err(Error::Timeout(timeout_duration));
}
warn!(
attempt = ack_retries,
"Retransmitting segmented request window"
);
for (seq, segment_data) in segments[next_seq..window_end]
.iter()
.enumerate()
.map(|(i, s)| (next_seq + i, s))
{
let is_last = seq == total_segments - 1;
let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
segmented: true,
more_follows: !is_last,
segmented_response_accepted: self
.config
.segmented_response_accepted,
max_segments: self.config.max_segments,
max_apdu_length: self.config.max_apdu_length,
invoke_id,
sequence_number: Some(seq as u8),
proposed_window_size: Some(
self.config.proposed_window_size.max(1),
),
service_choice,
service_request: segment_data.clone(),
});
let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
encode_apdu(&mut buf, &pdu)?;
self.send_confirmed_target_apdu(target, &buf).await?;
}
}
}
}
};
debug!(
seq = ack.sequence_number,
negative = ack.negative_ack,
window = ack.actual_window_size,
"Received SegmentAck"
);
window_size = ack.actual_window_size.max(1) as usize;
let ack_seq = ack.sequence_number as usize;
if ack_seq >= total_segments {
return Err(Error::Segmentation(format!(
"SegmentAck sequence {} out of range (total {})",
ack_seq, total_segments
)));
}
if ack.negative_ack {
neg_ack_retries += 1;
if neg_ack_retries > MAX_NEG_ACK_RETRIES {
return Err(Error::Segmentation(
"too many negative SegmentAck retransmissions".into(),
));
}
next_seq = ack_seq + 1;
} else {
neg_ack_retries = 0;
next_seq = ack_seq + 1;
}
}
timeout(timeout_duration, rx)
.await
.map_err(|_| Error::Timeout(timeout_duration))?
.map_err(|_| Error::Encoding("TSM response channel closed".into()))
}
.await;
{
let key = (tsm_mac.clone(), invoke_id);
self.seg_ack_senders.lock().await.remove(&key);
}
let response = match result {
Ok(response) => response,
Err(e) => {
let mut tsm = self.tsm.lock().await;
tsm.cancel_transaction(&tsm_mac, invoke_id);
return Err(e);
}
};
match response {
TsmResponse::SimpleAck => Ok(Bytes::new()),
TsmResponse::ComplexAck { service_data } => Ok(service_data),
TsmResponse::Error { class, code } => Err(Error::Protocol { class, code }),
TsmResponse::Reject { reason } => Err(Error::Reject { reason }),
TsmResponse::Abort { reason } => Err(Error::Abort { reason }),
}
}
}