use std::collections::HashMap;
use std::net::Ipv4Addr;
#[cfg(feature = "ipv6")]
use std::net::Ipv6Addr;
use std::sync::Arc;
use std::time::Instant;
use bytes::{Bytes, BytesMut};
use tokio::sync::{broadcast, mpsc, Mutex};
use tokio::task::JoinHandle;
use tokio::time::{timeout, Duration};
use tracing::{debug, warn};
use bacnet_encoding::apdu::{
self, encode_apdu, validate_max_apdu_length, AbortPdu, Apdu,
ConfirmedRequest as ConfirmedRequestPdu, SegmentAck as SegmentAckPdu, SimpleAck,
};
use bacnet_encoding::npdu::NpduAddress;
use bacnet_network::layer::NetworkLayer;
use bacnet_services::cov::COVNotificationRequest;
use bacnet_transport::bip::BipTransport;
#[cfg(feature = "ipv6")]
use bacnet_transport::bip6::Bip6Transport;
use bacnet_transport::port::TransportPort;
use bacnet_types::enums::{ConfirmedServiceChoice, NetworkPriority, UnconfirmedServiceChoice};
use bacnet_types::error::Error;
use bacnet_types::MacAddr;
use crate::discovery::{DeviceTable, DiscoveredDevice};
use crate::segmentation::{max_segment_payload, split_payload, SegmentReceiver, SegmentedPduType};
use crate::tsm::{Tsm, TsmConfig, TsmResponse};
#[derive(Debug, Clone)]
pub struct ClientConfig {
pub interface: Ipv4Addr,
pub port: u16,
pub broadcast_address: Ipv4Addr,
pub apdu_timeout_ms: u64,
pub apdu_retries: u8,
pub max_apdu_length: u16,
pub max_segments: Option<u8>,
pub segmented_response_accepted: bool,
pub proposed_window_size: u8,
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
interface: Ipv4Addr::UNSPECIFIED,
port: 0xBAC0,
broadcast_address: Ipv4Addr::BROADCAST,
apdu_timeout_ms: 6000,
apdu_retries: 3,
max_apdu_length: 1476,
max_segments: None,
segmented_response_accepted: true,
proposed_window_size: 1,
}
}
}
pub struct ClientBuilder<T: TransportPort> {
config: ClientConfig,
transport: Option<T>,
}
impl<T: TransportPort + 'static> ClientBuilder<T> {
pub fn transport(mut self, transport: T) -> Self {
self.transport = Some(transport);
self
}
pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
self.config.apdu_timeout_ms = ms;
self
}
pub fn max_apdu_length(mut self, len: u16) -> Self {
self.config.max_apdu_length = len;
self
}
pub async fn build(self) -> Result<BACnetClient<T>, Error> {
let transport = self
.transport
.ok_or_else(|| Error::Encoding("transport not set on ClientBuilder".into()))?;
BACnetClient::start(self.config, transport).await
}
}
pub struct BipClientBuilder {
config: ClientConfig,
}
impl BipClientBuilder {
pub fn interface(mut self, ip: Ipv4Addr) -> Self {
self.config.interface = ip;
self
}
pub fn port(mut self, port: u16) -> Self {
self.config.port = port;
self
}
pub fn broadcast_address(mut self, addr: Ipv4Addr) -> Self {
self.config.broadcast_address = addr;
self
}
pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
self.config.apdu_timeout_ms = ms;
self
}
pub fn max_apdu_length(mut self, len: u16) -> Self {
self.config.max_apdu_length = len;
self
}
pub async fn build(self) -> Result<BACnetClient<BipTransport>, Error> {
let transport = BipTransport::new(
self.config.interface,
self.config.port,
self.config.broadcast_address,
);
BACnetClient::start(self.config, transport).await
}
}
const DEFAULT_BATCH_CONCURRENCY: usize = 32;
#[derive(Debug, Clone)]
pub struct DeviceReadRequest {
pub device_instance: u32,
pub object_identifier: bacnet_types::primitives::ObjectIdentifier,
pub property_identifier: bacnet_types::enums::PropertyIdentifier,
pub property_array_index: Option<u32>,
}
#[derive(Debug)]
pub struct DeviceReadResult {
pub device_instance: u32,
pub result: Result<bacnet_services::read_property::ReadPropertyACK, Error>,
}
#[derive(Debug, Clone)]
pub struct DeviceRpmRequest {
pub device_instance: u32,
pub specs: Vec<bacnet_services::rpm::ReadAccessSpecification>,
}
#[derive(Debug)]
pub struct DeviceRpmResult {
pub device_instance: u32,
pub result: Result<bacnet_services::rpm::ReadPropertyMultipleACK, Error>,
}
#[derive(Debug, Clone)]
pub struct DeviceWriteRequest {
pub device_instance: u32,
pub object_identifier: bacnet_types::primitives::ObjectIdentifier,
pub property_identifier: bacnet_types::enums::PropertyIdentifier,
pub property_array_index: Option<u32>,
pub property_value: Vec<u8>,
pub priority: Option<u8>,
}
#[derive(Debug)]
pub struct DeviceWriteResult {
pub device_instance: u32,
pub result: Result<(), Error>,
}
struct SegmentedReceiveState {
receiver: SegmentReceiver,
reply_mac: MacAddr,
expected_next_seq: u8,
last_activity: Instant,
window_position: u8,
proposed_window_size: u8,
}
const SEG_RECEIVER_TIMEOUT: Duration = Duration::from_secs(4);
type SegKey = (MacAddr, u8);
pub struct BACnetClient<T: TransportPort> {
config: ClientConfig,
network: Arc<NetworkLayer<T>>,
tsm: Arc<Mutex<Tsm>>,
device_table: Arc<Mutex<DeviceTable>>,
cov_tx: broadcast::Sender<COVNotificationRequest>,
dispatch_task: Option<JoinHandle<()>>,
seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
local_mac: MacAddr,
}
impl BACnetClient<BipTransport> {
pub fn bip_builder() -> BipClientBuilder {
BipClientBuilder {
config: ClientConfig::default(),
}
}
pub fn builder() -> BipClientBuilder {
Self::bip_builder()
}
pub async fn read_bdt(
&self,
target: &[u8],
) -> Result<Vec<bacnet_transport::bbmd::BdtEntry>, Error> {
self.network.transport().read_bdt(target).await
}
pub async fn write_bdt(
&self,
target: &[u8],
entries: &[bacnet_transport::bbmd::BdtEntry],
) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
self.network.transport().write_bdt(target, entries).await
}
pub async fn read_fdt(
&self,
target: &[u8],
) -> Result<Vec<bacnet_transport::bbmd::FdtEntryWire>, Error> {
self.network.transport().read_fdt(target).await
}
pub async fn delete_fdt_entry(
&self,
target: &[u8],
ip: [u8; 4],
port: u16,
) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
self.network
.transport()
.delete_fdt_entry(target, ip, port)
.await
}
pub async fn register_foreign_device_bvlc(
&self,
target: &[u8],
ttl: u16,
) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
self.network
.transport()
.register_foreign_device_bvlc(target, ttl)
.await
}
}
#[cfg(feature = "ipv6")]
impl BACnetClient<Bip6Transport> {
pub fn bip6_builder() -> Bip6ClientBuilder {
Bip6ClientBuilder {
config: ClientConfig::default(),
interface: Ipv6Addr::UNSPECIFIED,
device_instance: None,
}
}
}
#[cfg(feature = "ipv6")]
pub struct Bip6ClientBuilder {
config: ClientConfig,
interface: Ipv6Addr,
device_instance: Option<u32>,
}
#[cfg(feature = "ipv6")]
impl Bip6ClientBuilder {
pub fn interface(mut self, ip: Ipv6Addr) -> Self {
self.interface = ip;
self
}
pub fn port(mut self, port: u16) -> Self {
self.config.port = port;
self
}
pub fn device_instance(mut self, instance: u32) -> Self {
self.device_instance = Some(instance);
self
}
pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
self.config.apdu_timeout_ms = ms;
self
}
pub fn max_apdu_length(mut self, len: u16) -> Self {
self.config.max_apdu_length = len;
self
}
pub async fn build(self) -> Result<BACnetClient<Bip6Transport>, Error> {
let transport = Bip6Transport::new(self.interface, self.config.port, self.device_instance);
BACnetClient::start(self.config, transport).await
}
}
#[cfg(feature = "sc-tls")]
impl BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>> {
pub fn sc_builder() -> ScClientBuilder {
ScClientBuilder {
config: ClientConfig::default(),
hub_url: String::new(),
tls_config: None,
vmac: [0; 6],
heartbeat_interval_ms: 30_000,
heartbeat_timeout_ms: 60_000,
reconnect: None,
}
}
}
#[cfg(feature = "sc-tls")]
pub struct ScClientBuilder {
config: ClientConfig,
hub_url: String,
tls_config: Option<std::sync::Arc<tokio_rustls::rustls::ClientConfig>>,
vmac: bacnet_transport::sc_frame::Vmac,
heartbeat_interval_ms: u64,
heartbeat_timeout_ms: u64,
reconnect: Option<bacnet_transport::sc::ScReconnectConfig>,
}
#[cfg(feature = "sc-tls")]
impl ScClientBuilder {
pub fn hub_url(mut self, url: &str) -> Self {
self.hub_url = url.to_string();
self
}
pub fn tls_config(
mut self,
config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
) -> Self {
self.tls_config = Some(config);
self
}
pub fn vmac(mut self, vmac: [u8; 6]) -> Self {
self.vmac = vmac;
self
}
pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
self.config.apdu_timeout_ms = ms;
self
}
pub fn heartbeat_interval_ms(mut self, ms: u64) -> Self {
self.heartbeat_interval_ms = ms;
self
}
pub fn heartbeat_timeout_ms(mut self, ms: u64) -> Self {
self.heartbeat_timeout_ms = ms;
self
}
pub fn reconnect(mut self, config: bacnet_transport::sc::ScReconnectConfig) -> Self {
self.reconnect = Some(config);
self
}
pub async fn build(
self,
) -> Result<
BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>>,
Error,
> {
let tls_config = self
.tls_config
.ok_or_else(|| Error::Encoding("SC client builder: tls_config is required".into()))?;
let ws = bacnet_transport::sc_tls::TlsWebSocket::connect(&self.hub_url, tls_config).await?;
let mut transport = bacnet_transport::sc::ScTransport::new(ws, self.vmac)
.with_heartbeat_interval_ms(self.heartbeat_interval_ms)
.with_heartbeat_timeout_ms(self.heartbeat_timeout_ms);
if let Some(rc) = self.reconnect {
#[allow(deprecated)]
{
transport = transport.with_reconnect(rc);
}
}
BACnetClient::start(self.config, transport).await
}
}
#[derive(Clone, Copy)]
enum ConfirmedTarget<'a> {
Local {
mac: &'a [u8],
},
Routed {
router_mac: &'a [u8],
dest_network: u16,
dest_mac: &'a [u8],
},
}
impl<'a> ConfirmedTarget<'a> {
fn tsm_mac(&self) -> MacAddr {
match self {
Self::Local { mac } => MacAddr::from_slice(mac),
Self::Routed {
dest_network,
dest_mac,
..
} => routed_tsm_mac(*dest_network, dest_mac),
}
}
}
fn routed_tsm_mac(network: u16, mac: &[u8]) -> MacAddr {
let mut key = MacAddr::new();
key.extend_from_slice(&[0xFF, b'R']);
key.extend_from_slice(&network.to_be_bytes());
key.push(mac.len() as u8);
key.extend_from_slice(mac);
key
}
fn response_tsm_mac(source_mac: &[u8], source_network: &Option<NpduAddress>) -> MacAddr {
match source_network {
Some(address) if !address.mac_address.is_empty() => {
routed_tsm_mac(address.network, &address.mac_address)
}
_ => MacAddr::from_slice(source_mac),
}
}
mod cov;
mod device_mgmt;
mod discovery;
mod dispatch;
mod file_list;
mod lifecycle;
mod object_mgmt;
mod property;
mod requests;
mod segmentation;
#[cfg(test)]
mod tests;
impl<T: TransportPort + 'static> BACnetClient<T> {
pub fn generic_builder() -> ClientBuilder<T> {
ClientBuilder {
config: ClientConfig::default(),
transport: None,
}
}
}