1use std::collections::HashMap;
7use std::net::Ipv4Addr;
8use std::sync::Arc;
9use std::time::Instant;
10
11use bytes::{Bytes, BytesMut};
12use tokio::sync::{broadcast, mpsc, Mutex};
13use tokio::task::JoinHandle;
14use tokio::time::{timeout, Duration};
15use tracing::{debug, warn};
16
17use bacnet_encoding::apdu::{
18 self, encode_apdu, Apdu, ConfirmedRequest as ConfirmedRequestPdu, SegmentAck as SegmentAckPdu,
19 SimpleAck,
20};
21use bacnet_network::layer::NetworkLayer;
22use bacnet_services::cov::COVNotificationRequest;
23use bacnet_transport::bip::BipTransport;
24use bacnet_transport::port::TransportPort;
25use bacnet_types::enums::{ConfirmedServiceChoice, NetworkPriority, UnconfirmedServiceChoice};
26use bacnet_types::error::Error;
27use bacnet_types::MacAddr;
28
29use crate::discovery::{DeviceTable, DiscoveredDevice};
30use crate::segmentation::{max_segment_payload, split_payload, SegmentReceiver, SegmentedPduType};
31use crate::tsm::{Tsm, TsmConfig, TsmResponse};
32
33#[derive(Debug, Clone)]
35pub struct ClientConfig {
36 pub interface: Ipv4Addr,
38 pub port: u16,
40 pub broadcast_address: Ipv4Addr,
42 pub apdu_timeout_ms: u64,
44 pub apdu_retries: u8,
46 pub max_apdu_length: u16,
48 pub max_segments: Option<u8>,
50 pub segmented_response_accepted: bool,
52 pub proposed_window_size: u8,
54}
55
56impl Default for ClientConfig {
57 fn default() -> Self {
58 Self {
59 interface: Ipv4Addr::UNSPECIFIED,
60 port: 0xBAC0,
61 broadcast_address: Ipv4Addr::BROADCAST,
62 apdu_timeout_ms: 6000,
63 apdu_retries: 3,
64 max_apdu_length: 1476,
65 max_segments: None,
66 segmented_response_accepted: true,
67 proposed_window_size: 1,
68 }
69 }
70}
71
72pub struct ClientBuilder<T: TransportPort> {
74 config: ClientConfig,
75 transport: Option<T>,
76}
77
78impl<T: TransportPort + 'static> ClientBuilder<T> {
79 pub fn transport(mut self, transport: T) -> Self {
81 self.transport = Some(transport);
82 self
83 }
84
85 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
87 self.config.apdu_timeout_ms = ms;
88 self
89 }
90
91 pub fn max_apdu_length(mut self, len: u16) -> Self {
93 self.config.max_apdu_length = len;
94 self
95 }
96
97 pub async fn build(self) -> Result<BACnetClient<T>, Error> {
99 let transport = self
100 .transport
101 .ok_or_else(|| Error::Encoding("transport not set on ClientBuilder".into()))?;
102 BACnetClient::start(self.config, transport).await
103 }
104}
105
106pub struct BipClientBuilder {
108 config: ClientConfig,
109}
110
111impl BipClientBuilder {
112 pub fn interface(mut self, ip: Ipv4Addr) -> Self {
114 self.config.interface = ip;
115 self
116 }
117
118 pub fn port(mut self, port: u16) -> Self {
120 self.config.port = port;
121 self
122 }
123
124 pub fn broadcast_address(mut self, addr: Ipv4Addr) -> Self {
126 self.config.broadcast_address = addr;
127 self
128 }
129
130 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
132 self.config.apdu_timeout_ms = ms;
133 self
134 }
135
136 pub fn max_apdu_length(mut self, len: u16) -> Self {
138 self.config.max_apdu_length = len;
139 self
140 }
141
142 pub async fn build(self) -> Result<BACnetClient<BipTransport>, Error> {
144 let transport = BipTransport::new(
145 self.config.interface,
146 self.config.port,
147 self.config.broadcast_address,
148 );
149 BACnetClient::start(self.config, transport).await
150 }
151}
152
153struct SegmentedReceiveState {
155 receiver: SegmentReceiver,
156 expected_next_seq: u8,
158 last_activity: Instant,
160}
161
162const SEG_RECEIVER_TIMEOUT: Duration = Duration::from_secs(4);
164
165type SegKey = (MacAddr, u8);
167
168pub struct BACnetClient<T: TransportPort> {
170 config: ClientConfig,
171 network: Arc<NetworkLayer<T>>,
172 tsm: Arc<Mutex<Tsm>>,
173 device_table: Arc<Mutex<DeviceTable>>,
174 cov_tx: broadcast::Sender<COVNotificationRequest>,
175 dispatch_task: Option<JoinHandle<()>>,
176 seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
177 local_mac: MacAddr,
178}
179
180impl BACnetClient<BipTransport> {
181 pub fn bip_builder() -> BipClientBuilder {
183 BipClientBuilder {
184 config: ClientConfig::default(),
185 }
186 }
187
188 pub fn builder() -> BipClientBuilder {
190 Self::bip_builder()
191 }
192}
193
194#[cfg(feature = "sc-tls")]
195impl BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>> {
196 pub fn sc_builder() -> ScClientBuilder {
198 ScClientBuilder {
199 config: ClientConfig::default(),
200 hub_url: String::new(),
201 tls_config: None,
202 vmac: [0; 6],
203 heartbeat_interval_ms: 30_000,
204 heartbeat_timeout_ms: 60_000,
205 reconnect: None,
206 }
207 }
208}
209
210#[cfg(feature = "sc-tls")]
214pub struct ScClientBuilder {
215 config: ClientConfig,
216 hub_url: String,
217 tls_config: Option<std::sync::Arc<tokio_rustls::rustls::ClientConfig>>,
218 vmac: bacnet_transport::sc_frame::Vmac,
219 heartbeat_interval_ms: u64,
220 heartbeat_timeout_ms: u64,
221 reconnect: Option<bacnet_transport::sc::ScReconnectConfig>,
222}
223
224#[cfg(feature = "sc-tls")]
225impl ScClientBuilder {
226 pub fn hub_url(mut self, url: &str) -> Self {
228 self.hub_url = url.to_string();
229 self
230 }
231
232 pub fn tls_config(
234 mut self,
235 config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
236 ) -> Self {
237 self.tls_config = Some(config);
238 self
239 }
240
241 pub fn vmac(mut self, vmac: [u8; 6]) -> Self {
243 self.vmac = vmac;
244 self
245 }
246
247 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
249 self.config.apdu_timeout_ms = ms;
250 self
251 }
252
253 pub fn heartbeat_interval_ms(mut self, ms: u64) -> Self {
255 self.heartbeat_interval_ms = ms;
256 self
257 }
258
259 pub fn heartbeat_timeout_ms(mut self, ms: u64) -> Self {
261 self.heartbeat_timeout_ms = ms;
262 self
263 }
264
265 pub fn reconnect(mut self, config: bacnet_transport::sc::ScReconnectConfig) -> Self {
267 self.reconnect = Some(config);
268 self
269 }
270
271 pub async fn build(
273 self,
274 ) -> Result<
275 BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>>,
276 Error,
277 > {
278 let tls_config = self
279 .tls_config
280 .ok_or_else(|| Error::Encoding("SC client builder: tls_config is required".into()))?;
281
282 let ws = bacnet_transport::sc_tls::TlsWebSocket::connect(&self.hub_url, tls_config).await?;
283
284 let mut transport = bacnet_transport::sc::ScTransport::new(ws, self.vmac)
285 .with_heartbeat_interval_ms(self.heartbeat_interval_ms)
286 .with_heartbeat_timeout_ms(self.heartbeat_timeout_ms);
287 if let Some(rc) = self.reconnect {
288 #[allow(deprecated)]
289 {
290 transport = transport.with_reconnect(rc);
291 }
292 }
293
294 BACnetClient::start(self.config, transport).await
295 }
296}
297
298impl<T: TransportPort + 'static> BACnetClient<T> {
299 pub fn generic_builder() -> ClientBuilder<T> {
301 ClientBuilder {
302 config: ClientConfig::default(),
303 transport: None,
304 }
305 }
306
307 pub async fn start(mut config: ClientConfig, transport: T) -> Result<Self, Error> {
309 let transport_max = transport.max_apdu_length();
311 config.max_apdu_length = config.max_apdu_length.min(transport_max);
312
313 let mut network = NetworkLayer::new(transport);
314 let mut apdu_rx = network.start().await?;
315 let local_mac = MacAddr::from_slice(network.local_mac());
316
317 let network = Arc::new(network);
318
319 let tsm_config = TsmConfig {
320 apdu_timeout_ms: config.apdu_timeout_ms,
321 apdu_retries: config.apdu_retries,
322 };
323 let tsm = Arc::new(Mutex::new(Tsm::new(tsm_config)));
324 let tsm_dispatch = Arc::clone(&tsm);
325 let device_table = Arc::new(Mutex::new(DeviceTable::new()));
326 let device_table_dispatch = Arc::clone(&device_table);
327 let network_dispatch = Arc::clone(&network);
328 let (cov_tx, _) = broadcast::channel::<COVNotificationRequest>(64);
329 let cov_tx_dispatch = cov_tx.clone();
330 let seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>> =
331 Arc::new(Mutex::new(HashMap::new()));
332 let seg_ack_senders_dispatch = Arc::clone(&seg_ack_senders);
333
334 let dispatch_task = tokio::spawn(async move {
336 let mut seg_state: HashMap<SegKey, SegmentedReceiveState> = HashMap::new();
338
339 while let Some(received) = apdu_rx.recv().await {
340 let now = Instant::now();
342 seg_state.retain(|_key, state| {
343 now.duration_since(state.last_activity) < SEG_RECEIVER_TIMEOUT
344 });
345
346 match apdu::decode_apdu(received.apdu.clone()) {
347 Ok(decoded) => {
348 Self::dispatch_apdu(
349 &tsm_dispatch,
350 &device_table_dispatch,
351 &network_dispatch,
352 &cov_tx_dispatch,
353 &mut seg_state,
354 &seg_ack_senders_dispatch,
355 &received.source_mac,
356 decoded,
357 )
358 .await;
359 }
360 Err(e) => {
361 warn!(error = %e, "Failed to decode received APDU");
362 }
363 }
364 }
365 });
366
367 Ok(Self {
368 config,
369 network,
370 tsm,
371 device_table,
372 cov_tx,
373 dispatch_task: Some(dispatch_task),
374 seg_ack_senders,
375 local_mac,
376 })
377 }
378
379 #[allow(clippy::too_many_arguments)]
381 async fn dispatch_apdu(
382 tsm: &Arc<Mutex<Tsm>>,
383 device_table: &Arc<Mutex<DeviceTable>>,
384 network: &Arc<NetworkLayer<T>>,
385 cov_tx: &broadcast::Sender<COVNotificationRequest>,
386 seg_state: &mut HashMap<SegKey, SegmentedReceiveState>,
387 seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
388 source_mac: &[u8],
389 apdu: Apdu,
390 ) {
391 match apdu {
392 Apdu::SimpleAck(ack) => {
393 debug!(invoke_id = ack.invoke_id, "Received SimpleAck");
394 let mut tsm = tsm.lock().await;
395 tsm.complete_transaction(source_mac, ack.invoke_id, TsmResponse::SimpleAck);
396 }
397 Apdu::ComplexAck(ack) => {
398 if ack.segmented {
399 Self::handle_segmented_complex_ack(tsm, network, seg_state, source_mac, ack)
400 .await;
401 } else {
402 debug!(invoke_id = ack.invoke_id, "Received ComplexAck");
403 let mut tsm = tsm.lock().await;
404 tsm.complete_transaction(
405 source_mac,
406 ack.invoke_id,
407 TsmResponse::ComplexAck {
408 service_data: ack.service_ack,
409 },
410 );
411 }
412 }
413 Apdu::Error(err) => {
414 debug!(invoke_id = err.invoke_id, "Received Error PDU");
415 let mut tsm = tsm.lock().await;
416 tsm.complete_transaction(
417 source_mac,
418 err.invoke_id,
419 TsmResponse::Error {
420 class: err.error_class.to_raw() as u32,
421 code: err.error_code.to_raw() as u32,
422 },
423 );
424 }
425 Apdu::Reject(rej) => {
426 debug!(invoke_id = rej.invoke_id, "Received Reject PDU");
427 let mut tsm = tsm.lock().await;
428 tsm.complete_transaction(
429 source_mac,
430 rej.invoke_id,
431 TsmResponse::Reject {
432 reason: rej.reject_reason.to_raw(),
433 },
434 );
435 }
436 Apdu::Abort(abt) => {
437 debug!(invoke_id = abt.invoke_id, "Received Abort PDU");
438 let mut tsm = tsm.lock().await;
439 tsm.complete_transaction(
440 source_mac,
441 abt.invoke_id,
442 TsmResponse::Abort {
443 reason: abt.abort_reason.to_raw(),
444 },
445 );
446 }
447 Apdu::ConfirmedRequest(req) => {
448 if req.service_choice == ConfirmedServiceChoice::CONFIRMED_COV_NOTIFICATION {
450 match COVNotificationRequest::decode(&req.service_request) {
451 Ok(notification) => {
452 debug!(
453 object = ?notification.monitored_object_identifier,
454 "Received ConfirmedCOVNotification"
455 );
456 let _ = cov_tx.send(notification);
457
458 let ack = Apdu::SimpleAck(SimpleAck {
460 invoke_id: req.invoke_id,
461 service_choice: req.service_choice,
462 });
463 let mut buf = BytesMut::with_capacity(4);
464 encode_apdu(&mut buf, &ack);
465 if let Err(e) = network
466 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
467 .await
468 {
469 warn!(error = %e, "Failed to send SimpleAck for COV notification");
470 }
471 }
472 Err(e) => {
473 warn!(error = %e, "Failed to decode ConfirmedCOVNotification");
474 }
475 }
476 } else {
477 debug!(
478 service = req.service_choice.to_raw(),
479 "Ignoring ConfirmedRequest (client mode)"
480 );
481 }
482 }
483 Apdu::UnconfirmedRequest(req) => {
484 if req.service_choice == UnconfirmedServiceChoice::I_AM {
485 match bacnet_services::who_is::IAmRequest::decode(&req.service_request) {
486 Ok(i_am) => {
487 debug!(
488 device = i_am.object_identifier.instance_number(),
489 vendor = i_am.vendor_id,
490 "Received IAm"
491 );
492 let device = DiscoveredDevice {
493 object_identifier: i_am.object_identifier,
494 mac_address: MacAddr::from_slice(source_mac),
495 max_apdu_length: i_am.max_apdu_length,
496 segmentation_supported: i_am.segmentation_supported,
497 max_segments_accepted: None,
498 vendor_id: i_am.vendor_id,
499 last_seen: std::time::Instant::now(),
500 };
501 device_table.lock().await.upsert(device);
502 }
503 Err(e) => {
504 warn!(error = %e, "Failed to decode IAm");
505 }
506 }
507 } else if req.service_choice
508 == UnconfirmedServiceChoice::UNCONFIRMED_COV_NOTIFICATION
509 {
510 match COVNotificationRequest::decode(&req.service_request) {
511 Ok(notification) => {
512 debug!(
513 object = ?notification.monitored_object_identifier,
514 "Received UnconfirmedCOVNotification"
515 );
516 let _ = cov_tx.send(notification);
517 }
518 Err(e) => {
519 warn!(error = %e, "Failed to decode UnconfirmedCOVNotification");
520 }
521 }
522 } else {
523 debug!(
524 service = req.service_choice.to_raw(),
525 "Ignoring unconfirmed service in client dispatch"
526 );
527 }
528 }
529 Apdu::SegmentAck(sa) => {
530 let key = (MacAddr::from_slice(source_mac), sa.invoke_id);
532 let senders = seg_ack_senders.lock().await;
533 if let Some(tx) = senders.get(&key) {
534 let _ = tx.try_send(sa);
535 } else {
536 debug!(
537 invoke_id = sa.invoke_id,
538 "Ignoring SegmentAck for unknown transaction"
539 );
540 }
541 }
542 }
543 }
544
545 async fn handle_segmented_complex_ack(
548 tsm: &Arc<Mutex<Tsm>>,
549 network: &Arc<NetworkLayer<T>>,
550 seg_state: &mut HashMap<SegKey, SegmentedReceiveState>,
551 source_mac: &[u8],
552 ack: bacnet_encoding::apdu::ComplexAck,
553 ) {
554 let seq = ack.sequence_number.unwrap_or(0);
555 let key = (MacAddr::from_slice(source_mac), ack.invoke_id);
556
557 debug!(
558 invoke_id = ack.invoke_id,
559 seq = seq,
560 more = ack.more_follows,
561 "Received segmented ComplexAck"
562 );
563
564 const MAX_CONCURRENT_SEG_SESSIONS: usize = 64;
566 if !seg_state.contains_key(&key) && seg_state.len() >= MAX_CONCURRENT_SEG_SESSIONS {
567 warn!(
568 invoke_id = ack.invoke_id,
569 sessions = seg_state.len(),
570 "Max concurrent segmented sessions reached, dropping segment"
571 );
572 return;
573 }
574
575 let state = seg_state
577 .entry(key.clone())
578 .or_insert_with(|| SegmentedReceiveState {
579 receiver: SegmentReceiver::new(),
580 expected_next_seq: 0,
581 last_activity: Instant::now(),
582 });
583
584 state.last_activity = Instant::now();
586
587 if seq != state.expected_next_seq {
591 warn!(
592 invoke_id = ack.invoke_id,
593 expected = state.expected_next_seq,
594 received = seq,
595 "Segment gap detected, sending negative SegmentAck"
596 );
597 let neg_ack = Apdu::SegmentAck(SegmentAckPdu {
598 negative_ack: true,
599 sent_by_server: false,
600 invoke_id: ack.invoke_id,
601 sequence_number: state.expected_next_seq,
602 actual_window_size: ack.proposed_window_size.unwrap_or(1),
603 });
604 let mut buf = BytesMut::with_capacity(4);
605 encode_apdu(&mut buf, &neg_ack);
606 if let Err(e) = network
607 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
608 .await
609 {
610 warn!(error = %e, "Failed to send negative SegmentAck");
611 }
612 return;
613 }
614
615 if let Err(e) = state.receiver.receive(seq, ack.service_ack) {
617 warn!(error = %e, "Rejecting oversized segment");
618 return;
619 }
620 state.expected_next_seq = seq.wrapping_add(1);
621
622 let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
624 negative_ack: false,
625 sent_by_server: false,
626 invoke_id: ack.invoke_id,
627 sequence_number: seq,
628 actual_window_size: ack.proposed_window_size.unwrap_or(1),
629 });
630 let mut buf = BytesMut::with_capacity(4);
631 encode_apdu(&mut buf, &seg_ack);
632 if let Err(e) = network
633 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
634 .await
635 {
636 warn!(error = %e, "Failed to send SegmentAck");
637 }
638
639 if !ack.more_follows {
643 let state = seg_state.remove(&key).unwrap();
644 let total = state.receiver.received_count();
645 match state.receiver.reassemble(total) {
646 Ok(service_data) => {
647 debug!(
648 invoke_id = ack.invoke_id,
649 segments = total,
650 bytes = service_data.len(),
651 "Reassembled segmented ComplexAck"
652 );
653 let mut tsm = tsm.lock().await;
654 tsm.complete_transaction(
655 source_mac,
656 ack.invoke_id,
657 TsmResponse::ComplexAck {
658 service_data: Bytes::from(service_data),
659 },
660 );
661 }
662 Err(e) => {
663 warn!(error = %e, "Failed to reassemble segmented ComplexAck");
664 }
665 }
666 }
667 }
668
669 pub fn local_mac(&self) -> &[u8] {
671 &self.local_mac
672 }
673
674 pub async fn confirmed_request(
686 &self,
687 destination_mac: &[u8],
688 service_choice: ConfirmedServiceChoice,
689 service_data: &[u8],
690 ) -> Result<Bytes, Error> {
691 let unsegmented_apdu_size = 4 + service_data.len();
694 let (remote_max_apdu, remote_max_segments) = {
695 let dt = self.device_table.lock().await;
696 let device = dt.get_by_mac(destination_mac);
697 let max_apdu = device
698 .map(|d| d.max_apdu_length as u16)
699 .unwrap_or(self.config.max_apdu_length);
700 let max_seg = device.and_then(|d| d.max_segments_accepted);
701 (max_apdu, max_seg)
702 };
703 if unsegmented_apdu_size > remote_max_apdu as usize {
704 return self
705 .segmented_confirmed_request(
706 destination_mac,
707 service_choice,
708 service_data,
709 remote_max_apdu,
710 remote_max_segments,
711 )
712 .await;
713 }
714
715 let (invoke_id, rx) = {
717 let mut tsm = self.tsm.lock().await;
718 let invoke_id = tsm.allocate_invoke_id(destination_mac).ok_or_else(|| {
719 Error::Encoding("all invoke IDs exhausted for destination".into())
720 })?;
721 let rx = tsm.register_transaction(MacAddr::from_slice(destination_mac), invoke_id);
722 (invoke_id, rx)
723 };
724
725 let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
727 segmented: false,
728 more_follows: false,
729 segmented_response_accepted: self.config.segmented_response_accepted,
730 max_segments: self.config.max_segments,
731 max_apdu_length: self.config.max_apdu_length,
732 invoke_id,
733 sequence_number: None,
734 proposed_window_size: None,
735 service_choice,
736 service_request: Bytes::copy_from_slice(service_data),
737 });
738
739 let mut buf = BytesMut::with_capacity(6 + service_data.len());
740 encode_apdu(&mut buf, &pdu);
741
742 let timeout_duration = Duration::from_millis(self.config.apdu_timeout_ms);
746 let max_retries = self.config.apdu_retries;
747 let mut attempts: u8 = 0;
748 let mut rx = rx;
749
750 loop {
751 if let Err(e) = self
752 .network
753 .send_apdu(&buf, destination_mac, true, NetworkPriority::NORMAL)
754 .await
755 {
756 let mut tsm = self.tsm.lock().await;
758 tsm.cancel_transaction(destination_mac, invoke_id);
759 return Err(e);
760 }
761
762 match timeout(timeout_duration, &mut rx).await {
763 Ok(Ok(response)) => {
764 return match response {
766 TsmResponse::SimpleAck => Ok(Bytes::new()),
767 TsmResponse::ComplexAck { service_data } => Ok(service_data),
768 TsmResponse::Error { class, code } => Err(Error::Protocol { class, code }),
769 TsmResponse::Reject { reason } => Err(Error::Reject { reason }),
770 TsmResponse::Abort { reason } => Err(Error::Abort { reason }),
771 };
772 }
773 Ok(Err(_)) => {
774 return Err(Error::Encoding("TSM response channel closed".into()));
776 }
777 Err(_timeout) => {
778 attempts += 1;
779 if attempts > max_retries {
780 let mut tsm = self.tsm.lock().await;
782 tsm.cancel_transaction(destination_mac, invoke_id);
783 return Err(Error::Timeout(timeout_duration));
784 }
785 debug!(
786 invoke_id,
787 attempt = attempts,
788 max_retries,
789 "APDU timeout, retrying confirmed request"
790 );
791 }
792 }
793 }
794 }
795
796 async fn segmented_confirmed_request(
801 &self,
802 destination_mac: &[u8],
803 service_choice: ConfirmedServiceChoice,
804 service_data: &[u8],
805 remote_max_apdu: u16,
806 remote_max_segments: Option<u32>,
807 ) -> Result<Bytes, Error> {
808 let max_seg_size = max_segment_payload(remote_max_apdu, SegmentedPduType::ConfirmedRequest);
809 let segments = split_payload(service_data, max_seg_size);
810 let total_segments = segments.len();
811
812 if total_segments > 255 {
813 return Err(Error::Segmentation(format!(
814 "payload requires {} segments, maximum is 255",
815 total_segments
816 )));
817 }
818
819 if let Some(max_seg) = remote_max_segments {
820 if total_segments > max_seg as usize {
821 return Err(Error::Segmentation(format!(
822 "request requires {} segments but remote accepts at most {}",
823 total_segments, max_seg
824 )));
825 }
826 }
827
828 debug!(
829 total_segments,
830 max_seg_size,
831 service_data_len = service_data.len(),
832 "Starting segmented confirmed request"
833 );
834
835 let (invoke_id, rx) = {
837 let mut tsm = self.tsm.lock().await;
838 let invoke_id = tsm.allocate_invoke_id(destination_mac).ok_or_else(|| {
839 Error::Encoding("all invoke IDs exhausted for destination".into())
840 })?;
841 let rx = tsm.register_transaction(MacAddr::from_slice(destination_mac), invoke_id);
842 (invoke_id, rx)
843 };
844
845 let (seg_ack_tx, mut seg_ack_rx) = mpsc::channel(16);
847 {
848 let key = (MacAddr::from_slice(destination_mac), invoke_id);
849 self.seg_ack_senders.lock().await.insert(key, seg_ack_tx);
850 }
851
852 let timeout_duration = Duration::from_millis(self.config.apdu_timeout_ms);
853 let max_ack_retries = self.config.apdu_retries;
854 let mut window_size = self.config.proposed_window_size.max(1) as usize;
855 let mut next_seq: usize = 0;
856 let mut neg_ack_retries: u32 = 0;
857 const MAX_NEG_ACK_RETRIES: u32 = 10;
858
859 let result = async {
861 while next_seq < total_segments {
862 let window_end = (next_seq + window_size).min(total_segments);
863
864 for (seq, segment_data) in segments[next_seq..window_end]
865 .iter()
866 .enumerate()
867 .map(|(i, s)| (next_seq + i, s))
868 {
869 let is_last = seq == total_segments - 1;
870 let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
871 segmented: true,
872 more_follows: !is_last,
873 segmented_response_accepted: self.config.segmented_response_accepted,
874 max_segments: self.config.max_segments,
875 max_apdu_length: self.config.max_apdu_length,
876 invoke_id,
877 sequence_number: Some(seq as u8),
878 proposed_window_size: Some(self.config.proposed_window_size.max(1)),
879 service_choice,
880 service_request: segment_data.clone(),
881 });
882
883 let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
884 encode_apdu(&mut buf, &pdu);
885
886 self.network
887 .send_apdu(&buf, destination_mac, true, NetworkPriority::NORMAL)
888 .await?;
889
890 debug!(seq, is_last, "Sent segment");
891 }
892
893 let ack = {
895 let mut ack_retries: u8 = 0;
896 loop {
897 match timeout(timeout_duration, seg_ack_rx.recv()).await {
898 Ok(Some(ack)) => break ack,
899 Ok(None) => {
900 return Err(Error::Encoding("SegmentAck channel closed".into()));
901 }
902 Err(_timeout) => {
903 ack_retries += 1;
904 if ack_retries > max_ack_retries {
905 return Err(Error::Timeout(timeout_duration));
906 }
907 warn!(
908 attempt = ack_retries,
909 "Retransmitting segmented request window"
910 );
911 for (seq, segment_data) in segments[next_seq..window_end]
913 .iter()
914 .enumerate()
915 .map(|(i, s)| (next_seq + i, s))
916 {
917 let is_last = seq == total_segments - 1;
918 let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
919 segmented: true,
920 more_follows: !is_last,
921 segmented_response_accepted: self
922 .config
923 .segmented_response_accepted,
924 max_segments: self.config.max_segments,
925 max_apdu_length: self.config.max_apdu_length,
926 invoke_id,
927 sequence_number: Some(seq as u8),
928 proposed_window_size: Some(
929 self.config.proposed_window_size.max(1),
930 ),
931 service_choice,
932 service_request: segment_data.clone(),
933 });
934
935 let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
936 encode_apdu(&mut buf, &pdu);
937
938 self.network
939 .send_apdu(
940 &buf,
941 destination_mac,
942 true,
943 NetworkPriority::NORMAL,
944 )
945 .await?;
946 }
947 }
948 }
949 }
950 };
951
952 debug!(
953 seq = ack.sequence_number,
954 negative = ack.negative_ack,
955 window = ack.actual_window_size,
956 "Received SegmentAck"
957 );
958
959 window_size = ack.actual_window_size.max(1) as usize;
961
962 if ack.negative_ack {
963 neg_ack_retries += 1;
964 if neg_ack_retries > MAX_NEG_ACK_RETRIES {
965 return Err(Error::Segmentation(
966 "too many negative SegmentAck retransmissions".into(),
967 ));
968 }
969 next_seq = ack.sequence_number as usize;
971 } else {
972 neg_ack_retries = 0;
973 next_seq = ack.sequence_number as usize + 1;
975 }
976 }
977
978 timeout(timeout_duration, rx)
980 .await
981 .map_err(|_| Error::Timeout(timeout_duration))?
982 .map_err(|_| Error::Encoding("TSM response channel closed".into()))
983 }
984 .await;
985
986 {
988 let key = (MacAddr::from_slice(destination_mac), invoke_id);
989 self.seg_ack_senders.lock().await.remove(&key);
990 }
991
992 let response = match result {
994 Ok(response) => response,
995 Err(e) => {
996 let mut tsm = self.tsm.lock().await;
997 tsm.cancel_transaction(destination_mac, invoke_id);
998 return Err(e);
999 }
1000 };
1001
1002 match response {
1003 TsmResponse::SimpleAck => Ok(Bytes::new()),
1004 TsmResponse::ComplexAck { service_data } => Ok(service_data),
1005 TsmResponse::Error { class, code } => Err(Error::Protocol { class, code }),
1006 TsmResponse::Reject { reason } => Err(Error::Reject { reason }),
1007 TsmResponse::Abort { reason } => Err(Error::Abort { reason }),
1008 }
1009 }
1010
1011 pub async fn unconfirmed_request(
1013 &self,
1014 destination_mac: &[u8],
1015 service_choice: UnconfirmedServiceChoice,
1016 service_data: &[u8],
1017 ) -> Result<(), Error> {
1018 let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1019 service_choice,
1020 service_request: Bytes::copy_from_slice(service_data),
1021 });
1022
1023 let mut buf = BytesMut::with_capacity(2 + service_data.len());
1024 encode_apdu(&mut buf, &pdu);
1025
1026 self.network
1027 .send_apdu(&buf, destination_mac, false, NetworkPriority::NORMAL)
1028 .await
1029 }
1030
1031 pub async fn broadcast_unconfirmed(
1033 &self,
1034 service_choice: UnconfirmedServiceChoice,
1035 service_data: &[u8],
1036 ) -> Result<(), Error> {
1037 let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1038 service_choice,
1039 service_request: Bytes::copy_from_slice(service_data),
1040 });
1041
1042 let mut buf = BytesMut::with_capacity(2 + service_data.len());
1043 encode_apdu(&mut buf, &pdu);
1044
1045 self.network
1046 .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1047 .await
1048 }
1049
1050 pub async fn broadcast_global_unconfirmed(
1055 &self,
1056 service_choice: UnconfirmedServiceChoice,
1057 service_data: &[u8],
1058 ) -> Result<(), Error> {
1059 let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1060 service_choice,
1061 service_request: Bytes::copy_from_slice(service_data),
1062 });
1063
1064 let mut buf = BytesMut::with_capacity(2 + service_data.len());
1065 encode_apdu(&mut buf, &pdu);
1066
1067 self.network
1068 .broadcast_global_apdu(&buf, false, NetworkPriority::NORMAL)
1069 .await
1070 }
1071
1072 pub async fn read_property(
1078 &self,
1079 destination_mac: &[u8],
1080 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1081 property_identifier: bacnet_types::enums::PropertyIdentifier,
1082 property_array_index: Option<u32>,
1083 ) -> Result<bacnet_services::read_property::ReadPropertyACK, Error> {
1084 use bacnet_services::read_property::ReadPropertyRequest;
1085
1086 let request = ReadPropertyRequest {
1087 object_identifier,
1088 property_identifier,
1089 property_array_index,
1090 };
1091 let mut buf = BytesMut::new();
1092 request.encode(&mut buf);
1093
1094 let response_data = self
1095 .confirmed_request(destination_mac, ConfirmedServiceChoice::READ_PROPERTY, &buf)
1096 .await?;
1097
1098 bacnet_services::read_property::ReadPropertyACK::decode(&response_data)
1099 }
1100
1101 pub async fn write_property(
1103 &self,
1104 destination_mac: &[u8],
1105 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1106 property_identifier: bacnet_types::enums::PropertyIdentifier,
1107 property_array_index: Option<u32>,
1108 property_value: Vec<u8>,
1109 priority: Option<u8>,
1110 ) -> Result<(), Error> {
1111 use bacnet_services::write_property::WritePropertyRequest;
1112
1113 let request = WritePropertyRequest {
1114 object_identifier,
1115 property_identifier,
1116 property_array_index,
1117 property_value,
1118 priority,
1119 };
1120 let mut buf = BytesMut::new();
1121 request.encode(&mut buf);
1122
1123 let _ = self
1124 .confirmed_request(
1125 destination_mac,
1126 ConfirmedServiceChoice::WRITE_PROPERTY,
1127 &buf,
1128 )
1129 .await?;
1130
1131 Ok(())
1132 }
1133
1134 pub async fn read_property_multiple(
1136 &self,
1137 destination_mac: &[u8],
1138 specs: Vec<bacnet_services::rpm::ReadAccessSpecification>,
1139 ) -> Result<bacnet_services::rpm::ReadPropertyMultipleACK, Error> {
1140 use bacnet_services::rpm::{ReadPropertyMultipleACK, ReadPropertyMultipleRequest};
1141
1142 let request = ReadPropertyMultipleRequest {
1143 list_of_read_access_specs: specs,
1144 };
1145 let mut buf = BytesMut::new();
1146 request.encode(&mut buf);
1147
1148 let response_data = self
1149 .confirmed_request(
1150 destination_mac,
1151 ConfirmedServiceChoice::READ_PROPERTY_MULTIPLE,
1152 &buf,
1153 )
1154 .await?;
1155
1156 ReadPropertyMultipleACK::decode(&response_data)
1157 }
1158
1159 pub async fn write_property_multiple(
1161 &self,
1162 destination_mac: &[u8],
1163 specs: Vec<bacnet_services::wpm::WriteAccessSpecification>,
1164 ) -> Result<(), Error> {
1165 use bacnet_services::wpm::WritePropertyMultipleRequest;
1166
1167 let request = WritePropertyMultipleRequest {
1168 list_of_write_access_specs: specs,
1169 };
1170 let mut buf = BytesMut::new();
1171 request.encode(&mut buf);
1172
1173 let _ = self
1174 .confirmed_request(
1175 destination_mac,
1176 ConfirmedServiceChoice::WRITE_PROPERTY_MULTIPLE,
1177 &buf,
1178 )
1179 .await?;
1180
1181 Ok(())
1182 }
1183
1184 pub async fn who_is(
1186 &self,
1187 low_limit: Option<u32>,
1188 high_limit: Option<u32>,
1189 ) -> Result<(), Error> {
1190 use bacnet_services::who_is::WhoIsRequest;
1191
1192 let request = WhoIsRequest {
1193 low_limit,
1194 high_limit,
1195 };
1196 let mut buf = BytesMut::new();
1197 request.encode(&mut buf);
1198
1199 self.broadcast_global_unconfirmed(UnconfirmedServiceChoice::WHO_IS, &buf)
1200 .await
1201 }
1202
1203 pub async fn who_has(
1205 &self,
1206 object: bacnet_services::who_has::WhoHasObject,
1207 low_limit: Option<u32>,
1208 high_limit: Option<u32>,
1209 ) -> Result<(), Error> {
1210 use bacnet_services::who_has::WhoHasRequest;
1211
1212 let request = WhoHasRequest {
1213 low_limit,
1214 high_limit,
1215 object,
1216 };
1217 let mut buf = BytesMut::new();
1218 request.encode(&mut buf)?;
1219
1220 self.broadcast_unconfirmed(UnconfirmedServiceChoice::WHO_HAS, &buf)
1221 .await
1222 }
1223
1224 pub async fn subscribe_cov(
1226 &self,
1227 destination_mac: &[u8],
1228 subscriber_process_identifier: u32,
1229 monitored_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1230 confirmed: bool,
1231 lifetime: Option<u32>,
1232 ) -> Result<(), Error> {
1233 use bacnet_services::cov::SubscribeCOVRequest;
1234
1235 let request = SubscribeCOVRequest {
1236 subscriber_process_identifier,
1237 monitored_object_identifier,
1238 issue_confirmed_notifications: Some(confirmed),
1239 lifetime,
1240 };
1241 let mut buf = BytesMut::new();
1242 request.encode(&mut buf);
1243
1244 let _ = self
1245 .confirmed_request(destination_mac, ConfirmedServiceChoice::SUBSCRIBE_COV, &buf)
1246 .await?;
1247
1248 Ok(())
1249 }
1250
1251 pub async fn unsubscribe_cov(
1253 &self,
1254 destination_mac: &[u8],
1255 subscriber_process_identifier: u32,
1256 monitored_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1257 ) -> Result<(), Error> {
1258 use bacnet_services::cov::SubscribeCOVRequest;
1259
1260 let request = SubscribeCOVRequest {
1261 subscriber_process_identifier,
1262 monitored_object_identifier,
1263 issue_confirmed_notifications: None,
1264 lifetime: None,
1265 };
1266 let mut buf = BytesMut::new();
1267 request.encode(&mut buf);
1268
1269 let _ = self
1270 .confirmed_request(destination_mac, ConfirmedServiceChoice::SUBSCRIBE_COV, &buf)
1271 .await?;
1272
1273 Ok(())
1274 }
1275
1276 pub async fn delete_object(
1278 &self,
1279 destination_mac: &[u8],
1280 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1281 ) -> Result<(), Error> {
1282 use bacnet_services::object_mgmt::DeleteObjectRequest;
1283
1284 let request = DeleteObjectRequest { object_identifier };
1285 let mut buf = BytesMut::new();
1286 request.encode(&mut buf);
1287
1288 let _ = self
1289 .confirmed_request(destination_mac, ConfirmedServiceChoice::DELETE_OBJECT, &buf)
1290 .await?;
1291
1292 Ok(())
1293 }
1294
1295 pub async fn create_object(
1297 &self,
1298 destination_mac: &[u8],
1299 object_specifier: bacnet_services::object_mgmt::ObjectSpecifier,
1300 initial_values: Vec<bacnet_services::common::BACnetPropertyValue>,
1301 ) -> Result<Bytes, Error> {
1302 use bacnet_services::object_mgmt::CreateObjectRequest;
1303
1304 let request = CreateObjectRequest {
1305 object_specifier,
1306 list_of_initial_values: initial_values,
1307 };
1308 let mut buf = BytesMut::new();
1309 request.encode(&mut buf);
1310
1311 self.confirmed_request(destination_mac, ConfirmedServiceChoice::CREATE_OBJECT, &buf)
1312 .await
1313 }
1314
1315 pub async fn device_communication_control(
1317 &self,
1318 destination_mac: &[u8],
1319 enable_disable: bacnet_types::enums::EnableDisable,
1320 time_duration: Option<u16>,
1321 password: Option<String>,
1322 ) -> Result<(), Error> {
1323 use bacnet_services::device_mgmt::DeviceCommunicationControlRequest;
1324
1325 let request = DeviceCommunicationControlRequest {
1326 time_duration,
1327 enable_disable,
1328 password,
1329 };
1330 let mut buf = BytesMut::new();
1331 request.encode(&mut buf)?;
1332
1333 let _ = self
1334 .confirmed_request(
1335 destination_mac,
1336 ConfirmedServiceChoice::DEVICE_COMMUNICATION_CONTROL,
1337 &buf,
1338 )
1339 .await?;
1340
1341 Ok(())
1342 }
1343
1344 pub async fn reinitialize_device(
1346 &self,
1347 destination_mac: &[u8],
1348 reinitialized_state: bacnet_types::enums::ReinitializedState,
1349 password: Option<String>,
1350 ) -> Result<(), Error> {
1351 use bacnet_services::device_mgmt::ReinitializeDeviceRequest;
1352
1353 let request = ReinitializeDeviceRequest {
1354 reinitialized_state,
1355 password,
1356 };
1357 let mut buf = BytesMut::new();
1358 request.encode(&mut buf)?;
1359
1360 let _ = self
1361 .confirmed_request(
1362 destination_mac,
1363 ConfirmedServiceChoice::REINITIALIZE_DEVICE,
1364 &buf,
1365 )
1366 .await?;
1367
1368 Ok(())
1369 }
1370
1371 pub async fn get_event_information(
1373 &self,
1374 destination_mac: &[u8],
1375 last_received_object_identifier: Option<bacnet_types::primitives::ObjectIdentifier>,
1376 ) -> Result<Bytes, Error> {
1377 use bacnet_services::alarm_event::GetEventInformationRequest;
1378
1379 let request = GetEventInformationRequest {
1380 last_received_object_identifier,
1381 };
1382 let mut buf = BytesMut::new();
1383 request.encode(&mut buf);
1384
1385 self.confirmed_request(
1386 destination_mac,
1387 ConfirmedServiceChoice::GET_EVENT_INFORMATION,
1388 &buf,
1389 )
1390 .await
1391 }
1392
1393 pub async fn acknowledge_alarm(
1395 &self,
1396 destination_mac: &[u8],
1397 acknowledging_process_identifier: u32,
1398 event_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1399 event_state_acknowledged: u32,
1400 acknowledgment_source: &str,
1401 ) -> Result<(), Error> {
1402 use bacnet_services::alarm_event::AcknowledgeAlarmRequest;
1403
1404 let request = AcknowledgeAlarmRequest {
1405 acknowledging_process_identifier,
1406 event_object_identifier,
1407 event_state_acknowledged,
1408 timestamp: bacnet_types::primitives::BACnetTimeStamp::SequenceNumber(0),
1409 acknowledgment_source: acknowledgment_source.to_string(),
1410 };
1411 let mut buf = BytesMut::new();
1412 request.encode(&mut buf)?;
1413
1414 let _ = self
1415 .confirmed_request(
1416 destination_mac,
1417 ConfirmedServiceChoice::ACKNOWLEDGE_ALARM,
1418 &buf,
1419 )
1420 .await?;
1421
1422 Ok(())
1423 }
1424
1425 pub async fn read_range(
1427 &self,
1428 destination_mac: &[u8],
1429 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1430 property_identifier: bacnet_types::enums::PropertyIdentifier,
1431 property_array_index: Option<u32>,
1432 range: Option<bacnet_services::read_range::RangeSpec>,
1433 ) -> Result<bacnet_services::read_range::ReadRangeAck, Error> {
1434 use bacnet_services::read_range::{ReadRangeAck, ReadRangeRequest};
1435
1436 let request = ReadRangeRequest {
1437 object_identifier,
1438 property_identifier,
1439 property_array_index,
1440 range,
1441 };
1442 let mut buf = BytesMut::new();
1443 request.encode(&mut buf);
1444
1445 let response_data = self
1446 .confirmed_request(destination_mac, ConfirmedServiceChoice::READ_RANGE, &buf)
1447 .await?;
1448
1449 ReadRangeAck::decode(&response_data)
1450 }
1451
1452 pub async fn atomic_read_file(
1454 &self,
1455 destination_mac: &[u8],
1456 file_identifier: bacnet_types::primitives::ObjectIdentifier,
1457 access: bacnet_services::file::FileAccessMethod,
1458 ) -> Result<Bytes, Error> {
1459 use bacnet_services::file::AtomicReadFileRequest;
1460
1461 let request = AtomicReadFileRequest {
1462 file_identifier,
1463 access,
1464 };
1465 let mut buf = BytesMut::new();
1466 request.encode(&mut buf);
1467
1468 self.confirmed_request(
1469 destination_mac,
1470 ConfirmedServiceChoice::ATOMIC_READ_FILE,
1471 &buf,
1472 )
1473 .await
1474 }
1475
1476 pub async fn atomic_write_file(
1478 &self,
1479 destination_mac: &[u8],
1480 file_identifier: bacnet_types::primitives::ObjectIdentifier,
1481 access: bacnet_services::file::FileWriteAccessMethod,
1482 ) -> Result<Bytes, Error> {
1483 use bacnet_services::file::AtomicWriteFileRequest;
1484
1485 let request = AtomicWriteFileRequest {
1486 file_identifier,
1487 access,
1488 };
1489 let mut buf = BytesMut::new();
1490 request.encode(&mut buf);
1491
1492 self.confirmed_request(
1493 destination_mac,
1494 ConfirmedServiceChoice::ATOMIC_WRITE_FILE,
1495 &buf,
1496 )
1497 .await
1498 }
1499
1500 pub async fn add_list_element(
1502 &self,
1503 destination_mac: &[u8],
1504 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1505 property_identifier: bacnet_types::enums::PropertyIdentifier,
1506 property_array_index: Option<u32>,
1507 list_of_elements: Vec<u8>,
1508 ) -> Result<(), Error> {
1509 use bacnet_services::list_manipulation::ListElementRequest;
1510
1511 let request = ListElementRequest {
1512 object_identifier,
1513 property_identifier,
1514 property_array_index,
1515 list_of_elements,
1516 };
1517 let mut buf = BytesMut::new();
1518 request.encode(&mut buf);
1519
1520 let _ = self
1521 .confirmed_request(
1522 destination_mac,
1523 ConfirmedServiceChoice::ADD_LIST_ELEMENT,
1524 &buf,
1525 )
1526 .await?;
1527
1528 Ok(())
1529 }
1530
1531 pub async fn remove_list_element(
1533 &self,
1534 destination_mac: &[u8],
1535 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1536 property_identifier: bacnet_types::enums::PropertyIdentifier,
1537 property_array_index: Option<u32>,
1538 list_of_elements: Vec<u8>,
1539 ) -> Result<(), Error> {
1540 use bacnet_services::list_manipulation::ListElementRequest;
1541
1542 let request = ListElementRequest {
1543 object_identifier,
1544 property_identifier,
1545 property_array_index,
1546 list_of_elements,
1547 };
1548 let mut buf = BytesMut::new();
1549 request.encode(&mut buf);
1550
1551 let _ = self
1552 .confirmed_request(
1553 destination_mac,
1554 ConfirmedServiceChoice::REMOVE_LIST_ELEMENT,
1555 &buf,
1556 )
1557 .await?;
1558
1559 Ok(())
1560 }
1561
1562 pub fn cov_notifications(&self) -> broadcast::Receiver<COVNotificationRequest> {
1567 self.cov_tx.subscribe()
1568 }
1569
1570 pub async fn discovered_devices(&self) -> Vec<DiscoveredDevice> {
1576 self.device_table.lock().await.all()
1577 }
1578
1579 pub async fn get_device(&self, instance: u32) -> Option<DiscoveredDevice> {
1581 self.device_table.lock().await.get(instance).cloned()
1582 }
1583
1584 pub async fn clear_devices(&self) {
1586 self.device_table.lock().await.clear();
1587 }
1588
1589 pub async fn stop(&mut self) -> Result<(), Error> {
1591 if let Some(task) = self.dispatch_task.take() {
1592 task.abort();
1593 let _ = task.await;
1594 }
1595 Ok(())
1597 }
1598}
1599
1600#[cfg(test)]
1601mod tests {
1602 use super::*;
1603 use bacnet_encoding::apdu::{ComplexAck, SimpleAck};
1604 use std::net::Ipv4Addr;
1605 use tokio::time::Duration;
1606
1607 async fn make_client() -> BACnetClient<BipTransport> {
1609 BACnetClient::builder()
1610 .interface(Ipv4Addr::LOCALHOST)
1611 .port(0)
1612 .apdu_timeout_ms(2000)
1613 .build()
1614 .await
1615 .unwrap()
1616 }
1617
1618 #[tokio::test]
1619 async fn client_start_stop() {
1620 let mut client = make_client().await;
1621 assert!(!client.local_mac().is_empty());
1622 client.stop().await.unwrap();
1623 }
1624
1625 #[tokio::test]
1626 async fn confirmed_request_simple_ack() {
1627 let mut client_a = make_client().await;
1628
1629 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1631 let mut net_b = NetworkLayer::new(transport_b);
1632 let mut rx_b = net_b.start().await.unwrap();
1633 let b_mac = net_b.local_mac().to_vec();
1634
1635 let b_handle = tokio::spawn(async move {
1637 let received = timeout(Duration::from_secs(2), rx_b.recv())
1638 .await
1639 .expect("B timed out")
1640 .expect("B channel closed");
1641
1642 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1643 if let Apdu::ConfirmedRequest(req) = decoded {
1644 let ack = Apdu::SimpleAck(SimpleAck {
1645 invoke_id: req.invoke_id,
1646 service_choice: req.service_choice,
1647 });
1648 let mut buf = BytesMut::new();
1649 encode_apdu(&mut buf, &ack);
1650 net_b
1651 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1652 .await
1653 .unwrap();
1654 }
1655 net_b.stop().await.unwrap();
1656 });
1657
1658 let result = client_a
1659 .confirmed_request(
1660 &b_mac,
1661 ConfirmedServiceChoice::WRITE_PROPERTY,
1662 &[0x01, 0x02],
1663 )
1664 .await;
1665
1666 assert!(result.is_ok());
1667 let response = result.unwrap();
1668 assert!(response.is_empty()); b_handle.await.unwrap();
1671 client_a.stop().await.unwrap();
1672 }
1673
1674 #[tokio::test]
1675 async fn confirmed_request_complex_ack() {
1676 let mut client_a = make_client().await;
1677
1678 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1679 let mut net_b = NetworkLayer::new(transport_b);
1680 let mut rx_b = net_b.start().await.unwrap();
1681 let b_mac = net_b.local_mac().to_vec();
1682
1683 let b_handle = tokio::spawn(async move {
1684 let received = timeout(Duration::from_secs(2), rx_b.recv())
1685 .await
1686 .unwrap()
1687 .unwrap();
1688
1689 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1690 if let Apdu::ConfirmedRequest(req) = decoded {
1691 let ack = Apdu::ComplexAck(ComplexAck {
1692 segmented: false,
1693 more_follows: false,
1694 invoke_id: req.invoke_id,
1695 sequence_number: None,
1696 proposed_window_size: None,
1697 service_choice: req.service_choice,
1698 service_ack: Bytes::from_static(&[0xDE, 0xAD, 0xBE, 0xEF]),
1699 });
1700 let mut buf = BytesMut::new();
1701 encode_apdu(&mut buf, &ack);
1702 net_b
1703 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1704 .await
1705 .unwrap();
1706 }
1707 net_b.stop().await.unwrap();
1708 });
1709
1710 let result = client_a
1711 .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
1712 .await;
1713
1714 assert!(result.is_ok());
1715 assert_eq!(result.unwrap(), vec![0xDE, 0xAD, 0xBE, 0xEF]);
1716
1717 b_handle.await.unwrap();
1718 client_a.stop().await.unwrap();
1719 }
1720
1721 #[tokio::test]
1722 async fn confirmed_request_timeout() {
1723 let mut client = make_client().await;
1724 let fake_mac = vec![10, 99, 99, 99, 0xBA, 0xC0];
1726 let result = client
1727 .confirmed_request(&fake_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
1728 .await;
1729 assert!(result.is_err());
1730 client.stop().await.unwrap();
1731 }
1732
1733 #[tokio::test]
1734 async fn segmented_complex_ack_reassembly() {
1735 let mut client = make_client().await;
1736
1737 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1738 let mut net_b = NetworkLayer::new(transport_b);
1739 let mut rx_b = net_b.start().await.unwrap();
1740 let b_mac = net_b.local_mac().to_vec();
1741
1742 let b_handle = tokio::spawn(async move {
1745 let received = timeout(Duration::from_secs(2), rx_b.recv())
1747 .await
1748 .unwrap()
1749 .unwrap();
1750
1751 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1752 let invoke_id = if let Apdu::ConfirmedRequest(req) = decoded {
1753 req.invoke_id
1754 } else {
1755 panic!("Expected ConfirmedRequest");
1756 };
1757
1758 let service_choice = ConfirmedServiceChoice::READ_PROPERTY;
1759 let segments: Vec<Bytes> = vec![
1760 Bytes::from_static(&[0x01, 0x02, 0x03]),
1761 Bytes::from_static(&[0x04, 0x05, 0x06]),
1762 Bytes::from_static(&[0x07, 0x08]),
1763 ];
1764
1765 for (i, seg) in segments.iter().enumerate() {
1766 let is_last = i == segments.len() - 1;
1767 let ack = Apdu::ComplexAck(ComplexAck {
1768 segmented: true,
1769 more_follows: !is_last,
1770 invoke_id,
1771 sequence_number: Some(i as u8),
1772 proposed_window_size: Some(1),
1773 service_choice,
1774 service_ack: seg.clone(),
1775 });
1776 let mut buf = BytesMut::new();
1777 encode_apdu(&mut buf, &ack);
1778 net_b
1779 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1780 .await
1781 .unwrap();
1782
1783 let seg_ack_msg = timeout(Duration::from_secs(2), rx_b.recv())
1785 .await
1786 .unwrap()
1787 .unwrap();
1788 let decoded = apdu::decode_apdu(seg_ack_msg.apdu.clone()).unwrap();
1789 if let Apdu::SegmentAck(sa) = decoded {
1790 assert_eq!(sa.invoke_id, invoke_id);
1791 assert_eq!(sa.sequence_number, i as u8);
1792 } else {
1793 panic!("Expected SegmentAck, got {:?}", decoded);
1794 }
1795 }
1796
1797 net_b.stop().await.unwrap();
1798 });
1799
1800 let result = client
1802 .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
1803 .await;
1804
1805 assert!(result.is_ok());
1806 assert_eq!(
1807 result.unwrap(),
1808 vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
1809 );
1810
1811 b_handle.await.unwrap();
1812 client.stop().await.unwrap();
1813 }
1814
1815 #[tokio::test]
1816 async fn segmented_confirmed_request_sends_segments() {
1817 let mut client = BACnetClient::builder()
1820 .interface(Ipv4Addr::LOCALHOST)
1821 .port(0)
1822 .apdu_timeout_ms(5000)
1823 .max_apdu_length(50)
1824 .build()
1825 .await
1826 .unwrap();
1827
1828 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1829 let mut net_b = NetworkLayer::new(transport_b);
1830 let mut rx_b = net_b.start().await.unwrap();
1831 let b_mac = net_b.local_mac().to_vec();
1832
1833 let service_data: Vec<u8> = (0u8..100).collect();
1835 let expected_data = service_data.clone();
1836
1837 let b_handle = tokio::spawn(async move {
1838 let mut all_service_data = Vec::new();
1839 let mut client_mac;
1840 let mut invoke_id;
1841
1842 loop {
1843 let received = timeout(Duration::from_secs(3), rx_b.recv())
1844 .await
1845 .expect("server timed out waiting for segment")
1846 .expect("server channel closed");
1847
1848 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1849 if let Apdu::ConfirmedRequest(req) = decoded {
1850 assert!(req.segmented, "expected segmented request");
1851 invoke_id = req.invoke_id;
1852 client_mac = received.source_mac.clone();
1853 let seq = req.sequence_number.unwrap();
1854 all_service_data.extend_from_slice(&req.service_request);
1855
1856 let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
1858 negative_ack: false,
1859 sent_by_server: true,
1860 invoke_id,
1861 sequence_number: seq,
1862 actual_window_size: 1,
1863 });
1864 let mut buf = BytesMut::new();
1865 encode_apdu(&mut buf, &seg_ack);
1866 net_b
1867 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1868 .await
1869 .unwrap();
1870
1871 if !req.more_follows {
1872 break;
1873 }
1874 } else {
1875 panic!("Expected ConfirmedRequest, got {:?}", decoded);
1876 }
1877 }
1878
1879 let ack = Apdu::SimpleAck(SimpleAck {
1881 invoke_id,
1882 service_choice: ConfirmedServiceChoice::WRITE_PROPERTY,
1883 });
1884 let mut buf = BytesMut::new();
1885 encode_apdu(&mut buf, &ack);
1886 net_b
1887 .send_apdu(&buf, &client_mac, false, NetworkPriority::NORMAL)
1888 .await
1889 .unwrap();
1890
1891 net_b.stop().await.unwrap();
1892 all_service_data
1893 });
1894
1895 let result = client
1896 .confirmed_request(
1897 &b_mac,
1898 ConfirmedServiceChoice::WRITE_PROPERTY,
1899 &service_data,
1900 )
1901 .await;
1902
1903 assert!(result.is_ok());
1904 assert!(result.unwrap().is_empty()); let received_data = b_handle.await.unwrap();
1908 assert_eq!(received_data, expected_data);
1909
1910 client.stop().await.unwrap();
1911 }
1912
1913 #[tokio::test]
1914 async fn segmented_request_with_complex_ack_response() {
1915 let mut client = BACnetClient::builder()
1916 .interface(Ipv4Addr::LOCALHOST)
1917 .port(0)
1918 .apdu_timeout_ms(5000)
1919 .max_apdu_length(50)
1920 .build()
1921 .await
1922 .unwrap();
1923
1924 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1925 let mut net_b = NetworkLayer::new(transport_b);
1926 let mut rx_b = net_b.start().await.unwrap();
1927 let b_mac = net_b.local_mac().to_vec();
1928
1929 let service_data: Vec<u8> = (0u8..60).collect();
1931
1932 let b_handle = tokio::spawn(async move {
1933 let mut client_mac;
1934 let mut invoke_id;
1935
1936 loop {
1937 let received = timeout(Duration::from_secs(3), rx_b.recv())
1938 .await
1939 .unwrap()
1940 .unwrap();
1941
1942 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1943 if let Apdu::ConfirmedRequest(req) = decoded {
1944 invoke_id = req.invoke_id;
1945 client_mac = received.source_mac.clone();
1946 let seq = req.sequence_number.unwrap();
1947
1948 let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
1949 negative_ack: false,
1950 sent_by_server: true,
1951 invoke_id,
1952 sequence_number: seq,
1953 actual_window_size: 1,
1954 });
1955 let mut buf = BytesMut::new();
1956 encode_apdu(&mut buf, &seg_ack);
1957 net_b
1958 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1959 .await
1960 .unwrap();
1961
1962 if !req.more_follows {
1963 break;
1964 }
1965 }
1966 }
1967
1968 let ack = Apdu::ComplexAck(ComplexAck {
1970 segmented: false,
1971 more_follows: false,
1972 invoke_id,
1973 sequence_number: None,
1974 proposed_window_size: None,
1975 service_choice: ConfirmedServiceChoice::READ_PROPERTY,
1976 service_ack: Bytes::from_static(&[0xCA, 0xFE]),
1977 });
1978 let mut buf = BytesMut::new();
1979 encode_apdu(&mut buf, &ack);
1980 net_b
1981 .send_apdu(&buf, &client_mac, false, NetworkPriority::NORMAL)
1982 .await
1983 .unwrap();
1984
1985 net_b.stop().await.unwrap();
1986 });
1987
1988 let result = client
1989 .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &service_data)
1990 .await;
1991
1992 assert!(result.is_ok());
1993 assert_eq!(result.unwrap(), vec![0xCA, 0xFE]);
1994
1995 b_handle.await.unwrap();
1996 client.stop().await.unwrap();
1997 }
1998
1999 #[tokio::test]
2000 async fn segment_overflow_guard() {
2001 let mut client = BACnetClient::builder()
2003 .interface(Ipv4Addr::LOCALHOST)
2004 .port(0)
2005 .apdu_timeout_ms(2000)
2006 .max_apdu_length(50)
2007 .build()
2008 .await
2009 .unwrap();
2010
2011 let huge_payload = vec![0u8; 256 * 44];
2014
2015 let fake_mac = vec![10, 99, 99, 99, 0xBA, 0xC0];
2018
2019 let result = client
2020 .confirmed_request(
2021 &fake_mac,
2022 ConfirmedServiceChoice::READ_PROPERTY,
2023 &huge_payload,
2024 )
2025 .await;
2026
2027 assert!(result.is_err());
2028 let err_msg = result.unwrap_err().to_string();
2029 assert!(
2030 err_msg.contains("256 segments"),
2031 "expected segment overflow error, got: {}",
2032 err_msg
2033 );
2034
2035 client.stop().await.unwrap();
2036 }
2037
2038 #[test]
2039 fn seg_receiver_timeout_is_4s() {
2040 assert_eq!(SEG_RECEIVER_TIMEOUT, Duration::from_secs(4));
2041 }
2042}