#![warn(missing_docs)]
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use crate::packet::{E131RootLayerData::*, *};
use crate::error::errors::*;
use uuid::Uuid;
use std::cmp::{Ordering, max};
use std::collections::HashMap;
use std::fmt;
use std::io::Read;
use std::net::{Ipv4Addr, SocketAddr};
use std::time::{Duration, Instant};
#[cfg(not(target_os = "windows"))]
use std::net::{IpAddr, Ipv6Addr};
#[cfg(not(target_os = "windows"))]
use libc::{AF_INET, AF_INET6};
#[cfg(target_os = "windows")]
const AF_INET: i32 = 2;
#[cfg(target_os = "windows")]
const AF_INET6: i32 = 23;
#[cfg(target_os = "windows")]
use std::net::IpAddr;
pub const RCV_BUF_DEFAULT_SIZE: usize = 1144;
pub const DMX_PAYLOAD_SIZE: usize = 513;
const PROCESS_PREVIEW_DATA_DEFAULT: bool = false;
const ANNOUNCE_SOURCE_DISCOVERY_DEFAULT: bool = false;
const ANNOUNCE_STREAM_TERMINATION_DEFAULT: bool = false;
const ANNOUNCE_TIMEOUT_DEFAULT: bool = false;
const DEFAULT_MERGE_FUNC: fn(&DMXData, &DMXData) -> Result<DMXData> =
discard_lowest_priority_then_previous;
#[derive(Debug)]
pub struct DMXData {
pub universe: u16,
pub values: Vec<u8>,
pub sync_uni: u16,
pub priority: u8,
pub src_cid: Option<Uuid>,
pub preview: bool,
pub recv_timestamp: Instant,
}
pub struct SacnReceiver {
receiver: SacnNetworkReceiver,
waiting_data: HashMap<u16, DMXData>,
universes: Vec<u16>,
discovered_sources: Vec<DiscoveredSacnSource>,
merge_func: fn(&DMXData, &DMXData) -> Result<DMXData>,
partially_discovered_sources: Vec<DiscoveredSacnSource>,
source_limit: Option<usize>,
sequences: SequenceNumbering,
process_preview_data: bool,
announce_source_discovery: bool,
announce_stream_termination: bool,
announce_timeout: bool,
}
#[derive(Clone, Debug)]
pub struct DiscoveredSacnSource {
pub name: String,
pub cid: Uuid,
pub last_updated: Instant,
pages: Vec<UniversePage>,
last_page: u8,
}
#[derive(Debug)]
struct SacnNetworkReceiver {
socket: Socket,
addr: SocketAddr,
is_multicast_enabled: bool,
}
#[derive(Eq, Ord, PartialEq, PartialOrd, Clone, Debug)]
struct UniversePage {
page: u8,
universes: Vec<u16>,
}
impl fmt::Debug for SacnReceiver {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self.receiver)?;
write!(f, "{:?}", self.waiting_data)?;
write!(f, "{:?}", self.universes)?;
write!(f, "{:?}", self.discovered_sources)?;
write!(f, "{:?}", self.partially_discovered_sources)
}
}
impl SacnReceiver {
pub fn with_ip(ip: SocketAddr, source_limit: Option<usize>) -> Result<SacnReceiver> {
if let Some(x) = source_limit
&& x == 0
{
return Err(SacnError::SourceLimitZero());
};
let mut sri = SacnReceiver {
receiver: SacnNetworkReceiver::new(ip)?,
waiting_data: HashMap::new(),
universes: Vec::new(),
discovered_sources: Vec::new(),
merge_func: DEFAULT_MERGE_FUNC,
partially_discovered_sources: Vec::new(),
process_preview_data: PROCESS_PREVIEW_DATA_DEFAULT,
source_limit,
sequences: SequenceNumbering::new(),
announce_source_discovery: ANNOUNCE_SOURCE_DISCOVERY_DEFAULT,
announce_stream_termination: ANNOUNCE_STREAM_TERMINATION_DEFAULT,
announce_timeout: ANNOUNCE_TIMEOUT_DEFAULT,
};
sri.listen_universes(&[E131_DISCOVERY_UNIVERSE])?;
Ok(sri)
}
pub fn set_is_multicast_enabled(&mut self, val: bool) -> Result<()> {
self.receiver.set_is_multicast_enabled(val)
}
pub fn is_multicast_enabled(&self) -> bool {
self.receiver.is_multicast_enabled()
}
pub fn reset_sources(&mut self) {
self.sequences.clear();
self.partially_discovered_sources.clear();
self.discovered_sources.clear();
}
pub fn clear_all_waiting_data(&mut self) {
self.waiting_data.clear();
}
pub fn clear_waiting_data(&mut self, universe: u16) -> bool {
self.waiting_data.remove(&universe).is_some()
}
pub fn set_merge_fn(&mut self, func: fn(&DMXData, &DMXData) -> Result<DMXData>) -> Result<()> {
self.merge_func = func;
Ok(())
}
pub fn set_ipv6_only(&mut self, val: bool) -> Result<()> {
self.receiver.set_only_v6(val)
}
pub fn listen_universes(&mut self, universes: &[u16]) -> Result<()> {
for u in universes {
is_universe_in_range(*u)?;
}
for u in universes {
match self.universes.binary_search(u) {
Err(i) => {
self.universes.insert(i, *u);
if self.is_multicast_enabled() {
self.receiver.listen_multicast_universe(*u)?;
}
}
Ok(_) => { }
}
}
Ok(())
}
pub fn mute_universe(&mut self, universe: u16) -> Result<()> {
is_universe_in_range(universe)?;
match self.universes.binary_search(&universe) {
Err(_) => {
Err(SacnError::UniverseNotFound(universe))
}
Ok(i) => {
self.universes.remove(i);
self.receiver.mute_multicast_universe(universe)
}
}
}
pub fn set_process_preview_data(&mut self, val: bool) {
self.process_preview_data = val;
}
pub fn is_listening(&self, universe: &u16) -> bool {
self.universes.contains(universe)
}
pub fn recv(&mut self, timeout: Option<Duration>) -> Result<Vec<DMXData>> {
if self.universes.len() == 1
&& self.universes[0] == E131_DISCOVERY_UNIVERSE
&& timeout.is_none()
&& !self.announce_source_discovery
{
return Err(SacnError::NoDataUniversesRegistered());
}
self.sequences.check_timeouts(self.announce_timeout)?;
self.check_waiting_data_timeouts();
if timeout == Some(Duration::from_secs(0)) {
if cfg!(target_os = "windows") {
return Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"No data available in given timeout",
)
.into());
} else {
return Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"No data available in given timeout",
)
.into());
}
}
let actual_timeout =
if timeout.is_some() && timeout.unwrap() < E131_NETWORK_DATA_LOSS_TIMEOUT {
timeout
} else {
Some(E131_NETWORK_DATA_LOSS_TIMEOUT)
};
self.receiver.set_timeout(actual_timeout)?; let start_time = Instant::now();
let mut buf: [u8; RCV_BUF_DEFAULT_SIZE] = [0; RCV_BUF_DEFAULT_SIZE];
match self.receiver.recv(&mut buf) {
Ok(pkt) => {
let pdu: E131RootLayer = pkt.pdu;
let data: E131RootLayerData = pdu.data;
let res = match data {
DataPacket(d) => self.handle_data_packet(pdu.cid, d)?,
SynchronizationPacket(s) => self.handle_sync_packet(pdu.cid, s)?,
UniverseDiscoveryPacket(u) => {
let discovered_src: Option<String> =
self.handle_universe_discovery_packet(pdu.cid, u);
if let Some(src) = discovered_src
&& self.announce_source_discovery
{
return Err(SacnError::SourceDiscovered(src));
};
None
}
};
match res {
Some(r) => Ok(r),
None => {
if let Some(timeout) = timeout {
let elapsed = start_time.elapsed();
match timeout.checked_sub(elapsed) {
None => {
Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"No data available in given timeout",
)
.into())
}
Some(new_timeout) => self.recv(Some(new_timeout)),
}
} else {
self.recv(timeout)
}
}
}
}
Err(err) => {
match err {
SacnError::Io(ref s) => {
match s.kind() {
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
if let Some(timeout) = timeout {
let elapsed = start_time.elapsed();
match timeout.checked_sub(elapsed) {
None => {
if cfg!(target_os = "windows") {
Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"No data available in given timeout",
)
.into())
} else {
Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"No data available in given timeout",
)
.into())
}
}
Some(new_timeout) => self.recv(Some(new_timeout)),
}
} else {
self.recv(timeout)
}
}
_ => {
Err(err)
}
}
}
_ => {
Err(err)
}
}
}
}
}
pub fn get_announce_source_discovery(&self) -> bool {
self.announce_source_discovery
}
pub fn get_discovered_sources_no_check(&mut self) -> Vec<DiscoveredSacnSource> {
self.discovered_sources.clone()
}
pub fn get_discovered_sources(&mut self) -> Vec<DiscoveredSacnSource> {
self.remove_expired_sources();
self.discovered_sources.clone()
}
pub fn set_announce_source_discovery(&mut self, new_val: bool) {
self.announce_source_discovery = new_val;
}
pub fn get_announce_timeout(&self) -> bool {
self.announce_timeout
}
pub fn set_announce_timeout(&mut self, new_val: bool) {
self.announce_timeout = new_val;
}
pub fn get_announce_stream_termination(&self) -> bool {
self.announce_stream_termination
}
pub fn set_announce_stream_termination(&mut self, new_val: bool) {
self.announce_stream_termination = new_val;
}
fn handle_data_packet(
&mut self,
cid: Uuid,
data_pkt: DataPacketFramingLayer,
) -> Result<Option<Vec<DMXData>>> {
if data_pkt.preview_data && !self.process_preview_data {
return Ok(None);
}
if data_pkt.stream_terminated {
self.terminate_stream(cid, data_pkt.universe);
if self.announce_stream_termination {
return Err(SacnError::UniverseTerminated(cid, data_pkt.universe));
}
return Ok(None);
}
if !self.is_listening(&data_pkt.universe) {
return Ok(None); }
self.sequences.check_data_seq_number(
self.source_limit,
cid,
data_pkt.sequence_number,
data_pkt.universe,
self.announce_timeout,
)?;
if data_pkt.synchronization_address == E131_NO_SYNC_ADDR {
self.clear_waiting_data(data_pkt.universe);
let vals: Vec<u8> = data_pkt.data.property_values.into_owned();
let dmx_data: DMXData = DMXData {
universe: data_pkt.universe,
values: vals.to_vec(),
sync_uni: data_pkt.synchronization_address,
priority: data_pkt.priority,
src_cid: Some(cid),
preview: data_pkt.preview_data,
recv_timestamp: Instant::now(),
};
Ok(Some(vec![dmx_data]))
} else {
self.listen_universes(&[data_pkt.synchronization_address])?;
let vals: Vec<u8> = data_pkt.data.property_values.into_owned();
let dmx_data: DMXData = DMXData {
universe: data_pkt.universe,
values: vals.to_vec(),
sync_uni: data_pkt.synchronization_address,
priority: data_pkt.priority,
src_cid: Some(cid),
preview: data_pkt.preview_data,
recv_timestamp: Instant::now(),
};
self.store_waiting_data(dmx_data)?;
Ok(None)
}
}
fn terminate_stream(&mut self, src_cid: Uuid, universe: u16) {
let _ = self.sequences.remove_seq_numbers(src_cid, universe);
if let Some(index) = find_discovered_src(&self.discovered_sources, &src_cid)
{
self.discovered_sources[index].terminate_universe(universe);
}
}
fn store_waiting_data(&mut self, data: DMXData) -> Result<()> {
match self.waiting_data.remove(&data.universe) {
Some(existing) => {
self.waiting_data
.insert(data.universe, ((self.merge_func)(&existing, &data))?);
}
None => {
self.waiting_data.insert(data.universe, data);
}
}
Ok(())
}
fn handle_sync_packet(
&mut self,
cid: Uuid,
sync_pkt: SynchronizationPacketFramingLayer,
) -> Result<Option<Vec<DMXData>>> {
if !self.is_listening(&sync_pkt.synchronization_address) {
return Ok(None); }
self.sequences.check_sync_seq_number(
self.source_limit,
cid,
sync_pkt.sequence_number,
sync_pkt.synchronization_address,
self.announce_timeout,
)?;
let res = self.rtrv_waiting_data(sync_pkt.synchronization_address);
if res.is_empty() {
Ok(None)
} else {
Ok(Some(res))
}
}
fn rtrv_waiting_data(&mut self, sync_uni: u16) -> Vec<DMXData> {
let mut keys: Vec<u16> = Vec::new();
for (uni, data) in self.waiting_data.iter() {
if data.sync_uni == sync_uni {
keys.push(*uni);
}
}
let mut res: Vec<DMXData> = Vec::new();
for k in keys {
let data = self.waiting_data.remove(&k).unwrap();
if data.recv_timestamp.elapsed() < E131_NETWORK_DATA_LOSS_TIMEOUT {
res.push(data);
}
}
res
}
fn update_discovered_srcs(&mut self, src: DiscoveredSacnSource) {
if let Some(index) = find_discovered_src(&self.discovered_sources, &src.cid) {
self.discovered_sources.remove(index);
}
self.discovered_sources.push(src);
}
fn handle_universe_discovery_packet(
&mut self,
cid: Uuid,
discovery_pkt: UniverseDiscoveryPacketFramingLayer,
) -> Option<String> {
let data: UniverseDiscoveryPacketUniverseDiscoveryLayer = discovery_pkt.data;
let page: u8 = data.page;
let last_page: u8 = data.last_page;
let universes = data.universes;
let uni_page: UniversePage = UniversePage {
page,
universes: universes.into(),
};
match find_discovered_src(
&self.partially_discovered_sources,
&cid
) {
Some(index) => {
self.partially_discovered_sources[index]
.pages
.push(uni_page);
self.partially_discovered_sources[index].last_updated = Instant::now();
if self.partially_discovered_sources[index].has_all_pages() {
let discovered_src: DiscoveredSacnSource =
self.partially_discovered_sources.remove(index);
self.update_discovered_srcs(discovered_src);
return Some(discovery_pkt.source_name.to_string());
}
}
None => {
let discovered_src: DiscoveredSacnSource = DiscoveredSacnSource {
name: discovery_pkt.source_name.to_string(),
cid,
last_page,
pages: vec![uni_page],
last_updated: Instant::now(),
};
if page == 0 && page == last_page {
self.update_discovered_srcs(discovered_src);
return Some(discovery_pkt.source_name.to_string());
} else {
self.partially_discovered_sources.push(discovered_src);
}
}
}
None }
fn check_waiting_data_timeouts(&mut self) {
self.waiting_data
.retain(|_uni, data| data.recv_timestamp.elapsed() < E131_NETWORK_DATA_LOSS_TIMEOUT);
}
fn remove_expired_sources(&mut self) {
self.partially_discovered_sources
.retain(|s| s.last_updated.elapsed() < UNIVERSE_DISCOVERY_SOURCE_TIMEOUT);
self.discovered_sources
.retain(|s| s.last_updated.elapsed() < UNIVERSE_DISCOVERY_SOURCE_TIMEOUT);
}
}
impl Drop for SacnReceiver {
fn drop(&mut self) {
let universes = self.universes.clone();
for u in universes {
match self.mute_universe(u) {
Ok(_) => {}
Err(_e) => { }
}
}
}
}
fn find_discovered_src(srcs: &[DiscoveredSacnSource], cid: &Uuid) -> Option<usize> {
(0..srcs.len()).find(|&i| srcs[i].cid == *cid)
}
#[cfg(target_os = "windows")]
impl SacnNetworkReceiver {
fn new(ip: SocketAddr) -> Result<SacnNetworkReceiver> {
Ok(SacnNetworkReceiver {
socket: create_win_socket(ip)?,
addr: ip,
is_multicast_enabled: !(ip.is_ipv6()), })
}
fn listen_multicast_universe(&self, universe: u16) -> Result<()> {
let multicast_addr = if self.addr.is_ipv4() {
universe_to_ipv4_multicast_addr(universe)? } else {
universe_to_ipv6_multicast_addr(universe)? };
join_win_multicast(&self.socket, multicast_addr, self.addr.ip())
}
fn mute_multicast_universe(&mut self, universe: u16) -> Result<()> {
let multicast_addr = if self.addr.is_ipv4() {
universe_to_ipv4_multicast_addr(universe)? } else {
universe_to_ipv6_multicast_addr(universe)? };
leave_win_multicast(&self.socket, multicast_addr)
}
fn set_is_multicast_enabled(&mut self, val: bool) -> Result<()> {
if val && self.is_ipv6() {
return Err(SacnError::OsOperationUnsupported(
"IPv6 multicast is currently unsupported on Windows".to_string(),
));
}
self.is_multicast_enabled = val;
Ok(())
}
fn is_multicast_enabled(&self) -> bool {
self.is_multicast_enabled
}
fn set_only_v6(&mut self, val: bool) -> Result<()> {
if self.addr.is_ipv4() {
Err(SacnError::IpVersionError())
} else {
Ok(self.socket.set_only_v6(val)?)
}
}
fn recv<'a>(
&mut self,
buf: &'a mut [u8; RCV_BUF_DEFAULT_SIZE],
) -> Result<AcnRootLayerProtocol<'a>> {
let n = self.socket.read(buf)?;
if n > RCV_BUF_DEFAULT_SIZE {
return Err(SacnError::TooManyBytesRead(n, buf.len()));
}
AcnRootLayerProtocol::parse(buf)
}
fn set_timeout(&mut self, timeout: Option<Duration>) -> Result<()> {
Ok(self.socket.set_read_timeout(timeout)?)
}
fn is_ipv6(&self) -> bool {
self.addr.is_ipv6()
}
}
#[cfg(not(target_os = "windows"))]
impl SacnNetworkReceiver {
fn new(ip: SocketAddr) -> Result<SacnNetworkReceiver> {
Ok(SacnNetworkReceiver {
socket: create_unix_socket(ip)?,
addr: ip,
is_multicast_enabled: true, })
}
fn listen_multicast_universe(&self, universe: u16) -> Result<()> {
let multicast_addr = if self.addr.is_ipv4() {
universe_to_ipv4_multicast_addr(universe)? } else {
universe_to_ipv6_multicast_addr(universe)? };
join_unix_multicast(&self.socket, multicast_addr, self.addr.ip())
}
fn mute_multicast_universe(&mut self, universe: u16) -> Result<()> {
let multicast_addr = if self.addr.is_ipv4() {
universe_to_ipv4_multicast_addr(universe)?
} else {
universe_to_ipv6_multicast_addr(universe)?
};
leave_unix_multicast(&self.socket, multicast_addr, self.addr.ip())
}
fn set_is_multicast_enabled(&mut self, val: bool) -> Result<()> {
self.is_multicast_enabled = val;
Ok(())
}
fn is_multicast_enabled(&self) -> bool {
self.is_multicast_enabled
}
fn set_only_v6(&mut self, val: bool) -> Result<()> {
if self.addr.is_ipv4() {
Err(SacnError::IpVersionError())
} else {
Ok(self.socket.set_only_v6(val)?)
}
}
fn recv<'a>(
&mut self,
buf: &'a mut [u8; RCV_BUF_DEFAULT_SIZE],
) -> Result<AcnRootLayerProtocol<'a>> {
let n = self.socket.read(buf)?;
if n > RCV_BUF_DEFAULT_SIZE {
return Err(SacnError::TooManyBytesRead(n, buf.len()));
}
AcnRootLayerProtocol::parse(buf)
}
fn set_timeout(&mut self, timeout: Option<Duration>) -> Result<()> {
Ok(self.socket.set_read_timeout(timeout)?)
}
}
impl Clone for DMXData {
fn clone(&self) -> DMXData {
let new_vals = self.values.to_vec(); DMXData {
universe: self.universe,
values: new_vals,
sync_uni: self.sync_uni,
priority: self.priority,
src_cid: self.src_cid,
preview: self.preview,
recv_timestamp: self.recv_timestamp,
}
}
}
impl Ord for DMXData {
fn cmp(&self, other: &Self) -> Ordering {
self.universe
.cmp(&other.universe)
.then(self.sync_uni.cmp(&other.sync_uni))
.then(self.values.cmp(&other.values))
}
}
impl PartialOrd for DMXData {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for DMXData {
fn eq(&self, other: &Self) -> bool {
self.universe == other.universe
&& self.sync_uni == other.sync_uni
&& self.values == other.values
}
}
impl Eq for DMXData {}
impl DiscoveredSacnSource {
pub fn has_all_pages(&mut self) -> bool {
self.pages.sort_by(|a, b| a.page.cmp(&b.page));
for i in 0..=self.last_page {
if self
.pages
.get(i as usize)
.map(|p| p.page != i)
.unwrap_or(true)
{
return false;
}
}
true
}
pub fn get_all_universes(&self) -> Vec<u16> {
let mut uni: Vec<u16> = Vec::new();
for p in &self.pages {
uni.extend_from_slice(&p.universes);
}
uni
}
pub fn terminate_universe(&mut self, universe: u16) {
for p in &mut self.pages {
p.universes.retain(|x| *x != universe)
}
}
}
#[cfg(not(target_os = "windows"))]
fn create_unix_socket(addr: SocketAddr) -> Result<Socket> {
if addr.is_ipv4() {
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
socket.set_reuse_port(true)?;
socket.set_reuse_address(true)?;
let socket_addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), ACN_SDT_MULTICAST_PORT);
socket.bind(&socket_addr.into())?;
Ok(socket)
} else {
let socket = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?;
socket.set_reuse_port(true)?;
socket.set_reuse_address(true)?;
let socket_addr =
SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), ACN_SDT_MULTICAST_PORT);
socket.bind(&socket_addr.into())?;
Ok(socket)
}
}
#[cfg(not(target_os = "windows"))]
fn join_unix_multicast(socket: &Socket, addr: SockAddr, interface_addr: IpAddr) -> Result<()> {
match addr.family() as i32 {
AF_INET => match addr.as_socket_ipv4() {
Some(a) => match interface_addr {
IpAddr::V4(ref interface_v4) => {
socket
.join_multicast_v4(a.ip(), interface_v4)
.map_err(|e| {
SacnError::Io(std::io::Error::new(
e.kind(),
"Failed to join IPv4 multicast",
))
})?;
}
IpAddr::V6(ref _interface_v6) => {
return Err(SacnError::IpVersionError());
}
},
None => {
return Err(SacnError::UnsupportedIpVersion("IP version recognised as AF_INET but not actually usable as AF_INET so must be unknown type".to_string()));
}
},
AF_INET6 => match addr.as_socket_ipv6() {
Some(a) => {
socket.join_multicast_v6(a.ip(), 0).map_err(|e| {
SacnError::Io(std::io::Error::new(
e.kind(),
"Failed to join IPv6 multicast",
))
})?;
}
None => {
return Err(SacnError::UnsupportedIpVersion("IP version recognised as AF_INET6 but not actually usable as AF_INET6 so must be unknown type".to_string()));
}
},
x => {
return Err(SacnError::UnsupportedIpVersion(format!("IP version not recognised as AF_INET (Ipv4) or AF_INET6 (Ipv6) - family value (as i32): {}", x).to_string()));
}
};
Ok(())
}
#[cfg(not(target_os = "windows"))]
fn leave_unix_multicast(socket: &Socket, addr: SockAddr, interface_addr: IpAddr) -> Result<()> {
match addr.family() as i32 {
AF_INET => match addr.as_socket_ipv4() {
Some(a) => match interface_addr {
IpAddr::V4(ref interface_v4) => {
socket
.leave_multicast_v4(a.ip(), interface_v4)
.map_err(|e| {
SacnError::Io(std::io::Error::new(
e.kind(),
"Failed to leave IPv4 multicast",
))
})?;
}
IpAddr::V6(ref _interface_v6) => {
return Err(SacnError::IpVersionError());
}
},
None => {
return Err(SacnError::UnsupportedIpVersion("IP version recognised as AF_INET but not actually usable as AF_INET so must be unknown type".to_string()));
}
},
AF_INET6 => match addr.as_socket_ipv6() {
Some(a) => {
socket.leave_multicast_v6(a.ip(), 0).map_err(|e| {
SacnError::Io(std::io::Error::new(
e.kind(),
"Failed to leave IPv6 multicast",
))
})?;
}
None => {
return Err(SacnError::UnsupportedIpVersion("IP version recognised as AF_INET6 but not actually usable as AF_INET6 so must be unknown type".to_string()));
}
},
x => {
return Err(SacnError::UnsupportedIpVersion(format!("IP version not recognised as AF_INET (Ipv4) or AF_INET6 (Ipv6) - family value (as i32): {}", x).to_string()));
}
};
Ok(())
}
#[cfg(target_os = "windows")]
fn create_win_socket(addr: SocketAddr) -> Result<Socket> {
if addr.is_ipv4() {
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
socket.set_reuse_address(true)?;
socket.bind(&SockAddr::from(addr))?;
Ok(socket)
} else {
let socket = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?;
socket.set_reuse_address(true)?;
socket.bind(&SockAddr::from(addr))?;
Ok(socket)
}
}
#[cfg(target_os = "windows")]
fn join_win_multicast(socket: &Socket, addr: SockAddr, interface_addr: IpAddr) -> Result<()> {
match addr.family() as i32 {
AF_INET => match addr.as_socket_ipv4() {
Some(a) => match interface_addr {
IpAddr::V4(ref interface_v4) => {
socket
.join_multicast_v4(a.ip(), interface_v4)
.map_err(|e| {
SacnError::Io(std::io::Error::new(
e.kind(),
"Failed to join IPv4 multicast",
))
})?;
}
IpAddr::V6(ref _interface_v6) => {
return Err(SacnError::IpVersionError());
}
},
None => {
return Err(SacnError::UnsupportedIpVersion("IP version recognised as AF_INET but not actually usable as AF_INET so must be unknown type".to_string()));
}
},
AF_INET6 => match addr.as_socket_ipv6() {
Some(a) => {
socket.join_multicast_v6(a.ip(), 0).map_err(|e| {
SacnError::Io(std::io::Error::new(
e.kind(),
"Failed to join IPv6 multicast",
))
})?;
}
None => {
return Err(SacnError::UnsupportedIpVersion("IP version recognised as AF_INET6 but not actually usable as AF_INET6 so must be unknown type".to_string()));
}
},
x => {
return Err(SacnError::UnsupportedIpVersion(format!("IP version not recognised as AF_INET (Ipv4) or AF_INET6 (Ipv6) - family value (as i32): {x}").to_string()));
}
};
Ok(())
}
#[cfg(target_os = "windows")]
fn leave_win_multicast(socket: &Socket, addr: SockAddr) -> Result<()> {
match addr.family() as i32 {
AF_INET => match addr.as_socket_ipv4() {
Some(a) => {
socket
.leave_multicast_v4(a.ip(), &Ipv4Addr::new(0, 0, 0, 0))
.map_err(|e| {
SacnError::Io(std::io::Error::new(
e.kind(),
"Failed to leave IPv4 multicast",
))
})?;
}
None => {
return Err(SacnError::UnsupportedIpVersion("IP version recognised as AF_INET but not actually usable as AF_INET so must be unknown type".to_string()));
}
},
AF_INET6 => match addr.as_socket_ipv6() {
Some(_) => {
return Err(SacnError::OsOperationUnsupported(
"IPv6 multicast is currently unsupported on Windows".to_string(),
));
}
None => {
return Err(SacnError::UnsupportedIpVersion("IP version recognised as AF_INET6 but not actually usable as AF_INET6 so must be unknown type".to_string()));
}
},
x => {
return Err(SacnError::UnsupportedIpVersion(format!("IP version not recognised as AF_INET (Ipv4) or AF_INET6 (Ipv6) - family value (as i32): {x}").to_string()));
}
};
Ok(())
}
#[derive(Copy, Clone)]
struct TimedStampedSeqNo {
sequence_number: u8,
last_recv: Instant,
}
impl TimedStampedSeqNo {
fn new(sequence_number: u8, last_recv: Instant) -> TimedStampedSeqNo {
TimedStampedSeqNo {
sequence_number,
last_recv,
}
}
}
struct SequenceNumbering {
data_sequences: HashMap<Uuid, HashMap<u16, TimedStampedSeqNo>>,
sync_sequences: HashMap<Uuid, HashMap<u16, TimedStampedSeqNo>>,
}
impl SequenceNumbering {
fn new() -> SequenceNumbering {
SequenceNumbering {
data_sequences: HashMap::new(),
sync_sequences: HashMap::new(),
}
}
fn clear(&mut self) {
self.data_sequences.clear();
self.sync_sequences.clear();
}
fn check_timeouts(&mut self, announce_timeout: bool) -> Result<()> {
check_timeouts(
&mut self.data_sequences,
E131_NETWORK_DATA_LOSS_TIMEOUT,
announce_timeout,
)?;
check_timeouts(
&mut self.sync_sequences,
E131_NETWORK_DATA_LOSS_TIMEOUT,
announce_timeout,
)
}
fn check_data_seq_number(
&mut self,
source_limit: Option<usize>,
cid: Uuid,
sequence_number: u8,
universe: u16,
announce_timeout: bool,
) -> Result<()> {
check_seq_number(
&mut self.data_sequences,
source_limit,
cid,
sequence_number,
universe,
announce_timeout,
)
}
fn check_sync_seq_number(
&mut self,
source_limit: Option<usize>,
cid: Uuid,
sequence_number: u8,
sync_uni: u16,
announce_timeout: bool,
) -> Result<()> {
check_seq_number(
&mut self.sync_sequences,
source_limit,
cid,
sequence_number,
sync_uni,
announce_timeout,
)
}
fn remove_seq_numbers(&mut self, src_cid: Uuid, universe: u16) -> Result<()> {
remove_source_universe_seq(&mut self.data_sequences, src_cid, universe)?;
remove_source_universe_seq(&mut self.sync_sequences, src_cid, universe)
}
}
fn check_seq_number(
src_sequences: &mut HashMap<Uuid, HashMap<u16, TimedStampedSeqNo>>,
source_limit: Option<usize>,
cid: Uuid,
sequence_number: u8,
universe: u16,
announce_timeout: bool,
) -> Result<()> {
check_timeouts(
src_sequences,
E131_NETWORK_DATA_LOSS_TIMEOUT,
announce_timeout,
)?;
if src_sequences.get(&cid).is_none() {
if source_limit.is_none() || src_sequences.len() < source_limit.unwrap() {
src_sequences.insert(cid, HashMap::new());
} else {
return Err(SacnError::SourcesExceededError(src_sequences.len()));
}
};
let expected_seq = match src_sequences.get(&cid) {
Some(src) => {
match src.get(&universe) {
Some(s) => {
*s
}
None => {
let initial_seq_num = sequence_number.wrapping_sub(1);
TimedStampedSeqNo::new(initial_seq_num, Instant::now())
}
}
}
None => {
panic!();
}
};
let seq_diff = sequence_number.wrapping_sub(expected_seq.sequence_number) as i8;
if seq_diff as isize <= E131_SEQ_DIFF_DISCARD_UPPER_BOUND
&& seq_diff as isize > E131_SEQ_DIFF_DISCARD_LOWER_BOUND
{
return Err(SacnError::OutOfSequence(
sequence_number,
expected_seq.sequence_number,
seq_diff as isize,
));
}
match src_sequences.get_mut(&cid) {
Some(src) => {
src.insert(
universe,
TimedStampedSeqNo::new(sequence_number, Instant::now()),
);
}
None => {
panic!();
}
};
Ok(())
}
fn check_timeouts(
src_sequences: &mut HashMap<Uuid, HashMap<u16, TimedStampedSeqNo>>,
timeout: Duration,
announce_timeout: bool,
) -> Result<()> {
if announce_timeout {
let mut timedout_src_id: Option<Uuid> = None;
let mut timedout_uni: Option<u16> = None;
for (src_id, universes) in src_sequences.iter_mut() {
for (uni, seq_num) in universes.iter() {
if seq_num.last_recv.elapsed() >= timeout {
timedout_src_id = Some(*src_id);
timedout_uni = Some(*uni);
break;
}
}
if timedout_uni.is_none() {
break;
}
}
if let Some(uni_to_remove) = timedout_uni {
let src_universes = src_sequences.get_mut(&timedout_src_id.unwrap());
if let Some(universes) = src_universes {
universes.remove(&uni_to_remove);
if universes.is_empty() {
src_sequences.remove(&timedout_src_id.unwrap());
}
return Err(SacnError::UniverseTimeout(
timedout_src_id.unwrap(),
uni_to_remove,
));
}
}
Ok(())
} else {
for (_src_id, universes) in src_sequences.iter_mut() {
universes.retain(|_uni, seq_num| seq_num.last_recv.elapsed() < timeout);
}
src_sequences.retain(|_src_id, universes| !universes.is_empty());
Ok(())
}
}
fn remove_source_universe_seq(
src_sequences: &mut HashMap<Uuid, HashMap<u16, TimedStampedSeqNo>>,
src_cid: Uuid,
universe: u16,
) -> Result<()> {
match src_sequences.get_mut(&src_cid) {
Some(x) => {
match x.remove(&universe) {
Some(_) => {
if x.is_empty() {
match src_sequences.remove(&src_cid) {
Some(_x) => Ok(()),
None => Err(SacnError::SourceNotFound(src_cid)),
}
} else {
Ok(())
}
}
None => Err(SacnError::UniverseNotFound(universe)),
}
}
None => Err(SacnError::SourceNotFound(src_cid)),
}
}
pub fn discard_lowest_priority_then_previous(i: &DMXData, n: &DMXData) -> Result<DMXData> {
if i.priority > n.priority {
return Ok(i.clone());
}
Ok(n.clone())
}
pub fn htp_dmx_merge(i: &DMXData, n: &DMXData) -> Result<DMXData> {
if i.values.is_empty()
|| n.values.is_empty()
|| i.universe != n.universe
|| i.values[0] != n.values[0]
|| i.sync_uni != n.sync_uni
{
return Err(SacnError::DmxMergeError());
}
if i.priority > n.priority {
return Ok(i.clone());
} else if n.priority > i.priority {
return Ok(n.clone());
}
let mut r: DMXData = DMXData {
universe: i.universe,
values: Vec::new(),
sync_uni: i.sync_uni,
priority: i.priority,
src_cid: None,
preview: i.preview || n.preview, recv_timestamp: i.recv_timestamp,
};
let mut i_iter = i.values.iter();
let mut n_iter = n.values.iter();
let mut i_val = i_iter.next();
let mut n_val = n_iter.next();
while (i_val.is_some()) || (n_val.is_some()) {
if let (Some(&i), Some(&n)) = (i_val, n_val) {
r.values.push(max(i, n));
} else if let Some(&i) = i_val
&& n_val.is_none()
{
r.values.push(i);
} else if let Some(&n) = n_val
&& i_val.is_none()
{
r.values.push(n);
}
i_val = i_iter.next();
n_val = n_iter.next();
}
Ok(r)
}
#[cfg(test)]
mod test {
use super::*;
use std::borrow::Cow;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Instant;
use uuid::Uuid;
const TEST_DATA_SINGLE_UNIVERSE: [u8; 512] = [
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 100, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 100, 1, 2, 3, 4,
5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 100, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
19, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 100, 1, 2, 3, 4, 5, 6, 7, 8,
9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 100, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
];
#[test]
fn test_handle_single_page_discovery_packet() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
let name = "Test Src 1";
let src_cid: Uuid = Uuid::from_bytes([
0xef, 0x07, 0xc8, 0xdd, 0x00, 0x64, 0x44, 0x01, 0xa3, 0xa2, 0x45, 0x9e, 0xf8, 0xe6,
0x14, 0x3e,
]);
let page: u8 = 0;
let last_page: u8 = 0;
let universes: Vec<u16> = vec![0, 1, 2, 3, 4, 5];
let discovery_pkt: UniverseDiscoveryPacketFramingLayer =
UniverseDiscoveryPacketFramingLayer {
source_name: name.into(),
data: UniverseDiscoveryPacketUniverseDiscoveryLayer {
page: page,
last_page: last_page,
universes: universes.clone().into(),
},
};
let res: Option<String> = dmx_rcv.handle_universe_discovery_packet(src_cid, discovery_pkt);
assert!(res.is_some());
assert_eq!(res.unwrap(), name);
assert_eq!(dmx_rcv.discovered_sources.len(), 1);
assert_eq!(dmx_rcv.discovered_sources[0].name, name);
assert_eq!(dmx_rcv.discovered_sources[0].cid, src_cid);
assert_eq!(dmx_rcv.discovered_sources[0].last_page, last_page);
assert_eq!(dmx_rcv.discovered_sources[0].pages.len(), 1);
assert_eq!(dmx_rcv.discovered_sources[0].pages[0].page, page);
assert_eq!(dmx_rcv.discovered_sources[0].pages[0].universes, universes);
}
#[test]
fn test_handle_multi_page_discovery_packet() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
let name = "Test Src 1";
let src_cid: Uuid = Uuid::from_bytes([
0xef, 0x07, 0xc8, 0xdd, 0x00, 0x64, 0x44, 0x01, 0xa3, 0xa2, 0x45, 0x9e, 0xf8, 0xe6,
0x14, 0x3e,
]);
let last_page: u8 = 1;
let mut universes_page_1: Vec<u16> = Vec::new();
let mut universes_page_2: Vec<u16> = Vec::new();
for i in 1..513 {
universes_page_1.push(i);
}
for i in 513..1024 {
universes_page_2.push(i);
}
let discovery_pkt_1: UniverseDiscoveryPacketFramingLayer =
UniverseDiscoveryPacketFramingLayer {
source_name: name.into(),
data: UniverseDiscoveryPacketUniverseDiscoveryLayer {
page: 0,
last_page,
universes: universes_page_1.clone().into(),
},
};
let discovery_pkt_2: UniverseDiscoveryPacketFramingLayer =
UniverseDiscoveryPacketFramingLayer {
source_name: name.into(),
data: UniverseDiscoveryPacketUniverseDiscoveryLayer {
page: 1,
last_page,
universes: universes_page_2.clone().into(),
},
};
let res: Option<String> = dmx_rcv.handle_universe_discovery_packet(src_cid, discovery_pkt_1);
assert!(res.is_none());
let res2: Option<String> = dmx_rcv.handle_universe_discovery_packet(src_cid, discovery_pkt_2);
assert!(res2.is_some()); assert_eq!(res2.unwrap(), name);
assert_eq!(dmx_rcv.discovered_sources.len(), 1);
assert_eq!(dmx_rcv.discovered_sources[0].name, name);
assert_eq!(dmx_rcv.discovered_sources[0].cid, src_cid);
assert_eq!(dmx_rcv.discovered_sources[0].last_page, last_page);
assert_eq!(dmx_rcv.discovered_sources[0].pages.len(), 2);
assert_eq!(dmx_rcv.discovered_sources[0].pages[0].page, 0);
assert_eq!(dmx_rcv.discovered_sources[0].pages[1].page, 1);
assert_eq!(
dmx_rcv.discovered_sources[0].pages[0].universes,
universes_page_1
);
assert_eq!(
dmx_rcv.discovered_sources[0].pages[1].universes,
universes_page_2
);
}
#[test]
fn test_store_retrieve_waiting_data() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
let sync_uni: u16 = 1;
let universe: u16 = 1;
let vals: Vec<u8> = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let dmx_data = DMXData {
universe: universe,
values: vals.clone(),
sync_uni: sync_uni,
priority: 100,
src_cid: None,
preview: false,
recv_timestamp: Instant::now(),
};
dmx_rcv.store_waiting_data(dmx_data).unwrap();
let res: Vec<DMXData> = dmx_rcv.rtrv_waiting_data(sync_uni);
assert_eq!(res.len(), 1);
assert_eq!(res[0].universe, universe);
assert_eq!(res[0].sync_uni, sync_uni);
assert_eq!(res[0].values, vals);
}
#[test]
fn test_store_2_retrieve_1_waiting_data() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
let sync_uni: u16 = 1;
let universe: u16 = 1;
let vals: Vec<u8> = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let dmx_data = DMXData {
universe: universe,
values: vals.clone(),
sync_uni: sync_uni,
priority: 100,
src_cid: None,
preview: false,
recv_timestamp: Instant::now(),
};
let dmx_data2 = DMXData {
universe: universe + 1,
values: vals.clone(),
sync_uni: sync_uni + 1,
priority: 100,
src_cid: None,
preview: false,
recv_timestamp: Instant::now(),
};
dmx_rcv.store_waiting_data(dmx_data).unwrap();
dmx_rcv.store_waiting_data(dmx_data2).unwrap();
let res: Vec<DMXData> = dmx_rcv.rtrv_waiting_data(sync_uni);
assert_eq!(res.len(), 1);
assert_eq!(res[0].universe, universe);
assert_eq!(res[0].sync_uni, sync_uni);
assert_eq!(res[0].values, vals);
}
#[test]
fn test_store_2_retrieve_2_waiting_data() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
let sync_uni: u16 = 1;
let universe: u16 = 1;
let vals: Vec<u8> = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let dmx_data = DMXData {
universe: universe,
values: vals.clone(),
sync_uni: sync_uni,
priority: 100,
src_cid: None,
preview: false,
recv_timestamp: Instant::now(),
};
let vals2: Vec<u8> = vec![0, 9, 7, 3, 2, 4, 5, 6, 5, 1, 2, 3];
let dmx_data2 = DMXData {
universe: universe + 1,
values: vals2.clone(),
sync_uni: sync_uni + 1,
priority: 100,
src_cid: None,
preview: false,
recv_timestamp: Instant::now(),
};
dmx_rcv.store_waiting_data(dmx_data).unwrap();
dmx_rcv.store_waiting_data(dmx_data2).unwrap();
let res: Vec<DMXData> = dmx_rcv.rtrv_waiting_data(sync_uni);
assert_eq!(res.len(), 1);
assert_eq!(res[0].universe, universe);
assert_eq!(res[0].sync_uni, sync_uni);
assert_eq!(res[0].values, vals);
let res2: Vec<DMXData> = dmx_rcv.rtrv_waiting_data(sync_uni + 1);
assert_eq!(res2.len(), 1);
assert_eq!(res2[0].universe, universe + 1);
assert_eq!(res2[0].sync_uni, sync_uni + 1);
assert_eq!(res2[0].values, vals2);
}
#[test]
fn test_store_2_same_universe_same_priority_waiting_data() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
let sync_uni: u16 = 1;
let universe: u16 = 1;
let vals: Vec<u8> = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let dmx_data = DMXData {
universe: universe,
values: vals.clone(),
sync_uni: sync_uni,
priority: 100,
src_cid: None,
preview: false,
recv_timestamp: Instant::now(),
};
let vals2: Vec<u8> = vec![0, 9, 7, 3, 2, 4, 5, 6, 5, 1, 2, 3];
let dmx_data2 = DMXData {
universe: universe,
values: vals2.clone(),
sync_uni: sync_uni,
priority: 100,
src_cid: None,
preview: false,
recv_timestamp: Instant::now(),
};
dmx_rcv.store_waiting_data(dmx_data).unwrap();
dmx_rcv.store_waiting_data(dmx_data2).unwrap();
let res2: Vec<DMXData> = dmx_rcv.rtrv_waiting_data(sync_uni);
assert_eq!(res2.len(), 1);
assert_eq!(res2[0].universe, universe);
assert_eq!(res2[0].sync_uni, sync_uni);
assert_eq!(res2[0].values, vals2);
assert_eq!(dmx_rcv.rtrv_waiting_data(sync_uni).len(), 0);
}
#[test]
fn test_store_2_same_universe_diff_priority_waiting_data() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
let sync_uni: u16 = 1;
let universe: u16 = 1;
let vals: Vec<u8> = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let dmx_data = DMXData {
universe: universe,
values: vals.clone(),
sync_uni: sync_uni,
priority: 120,
src_cid: None,
preview: false,
recv_timestamp: Instant::now(),
};
let vals2: Vec<u8> = vec![0, 9, 7, 3, 2, 4, 5, 6, 5, 1, 2, 3];
let dmx_data2 = DMXData {
universe: universe,
values: vals2.clone(),
sync_uni: sync_uni,
priority: 100,
src_cid: None,
preview: false,
recv_timestamp: Instant::now(),
};
dmx_rcv.store_waiting_data(dmx_data).unwrap();
dmx_rcv.store_waiting_data(dmx_data2).unwrap();
let res: Vec<DMXData> = dmx_rcv.rtrv_waiting_data(sync_uni);
assert_eq!(res.len(), 1);
assert_eq!(res[0].universe, universe);
assert_eq!(res[0].sync_uni, sync_uni);
assert_eq!(res[0].values, vals);
assert_eq!(dmx_rcv.rtrv_waiting_data(sync_uni).len(), 0);
}
fn generate_data_packet_framing_layer_seq_num<'a>(
universe: u16,
sequence_number: u8,
) -> DataPacketFramingLayer<'a> {
DataPacketFramingLayer {
source_name: "Source_A".into(),
priority: 100,
synchronization_address: 0,
sequence_number: sequence_number,
preview_data: false,
stream_terminated: false,
force_synchronization: false,
universe: universe,
data: DataPacketDmpLayer {
property_values: Cow::from(&TEST_DATA_SINGLE_UNIVERSE[0..]),
},
}
}
fn generate_sync_packet_framing_layer_seq_num<'a>(
sync_address: u16,
sequence_number: u8,
) -> SynchronizationPacketFramingLayer {
SynchronizationPacketFramingLayer {
sequence_number: sequence_number,
synchronization_address: sync_address,
}
}
#[test]
fn test_data_packet_sequence_number_below_expected() {
const UNIVERSE1: u16 = 1;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
dmx_rcv.listen_universes(&[UNIVERSE1]).unwrap();
let src_cid: Uuid = Uuid::from_bytes([
0xef, 0x07, 0xc8, 0xdd, 0x00, 0x64, 0x44, 0x01, 0xa3, 0xa2, 0x45, 0x9e, 0xf8, 0xe6,
0x14, 0x3e,
]);
let data_packet = generate_data_packet_framing_layer_seq_num(UNIVERSE1, 0);
let data_packet2 = generate_data_packet_framing_layer_seq_num(UNIVERSE1, 1);
let data_packet3 = generate_data_packet_framing_layer_seq_num(UNIVERSE1, 0);
assert!(
dmx_rcv
.handle_data_packet(src_cid, data_packet)
.unwrap()
.is_some(),
"Receiver incorrectly rejected first data packet"
);
assert!(
dmx_rcv
.handle_data_packet(src_cid, data_packet2)
.unwrap()
.is_some(),
"Receiver incorrectly rejected second data packet"
);
match dmx_rcv.handle_data_packet(src_cid, data_packet3) {
Err(SacnError::OutOfSequence(..)) => {
assert!(
true,
"Receiver correctly rejected third data packet with correct error"
);
}
Ok(_) => {
assert!(false, "Receiver incorrectly accepted third data packet");
}
Err(e) => {
assert!(
false,
"Receiver correctly rejected third data packet but with unexpected error: {}",
e
);
}
}
}
#[test]
fn test_data_packet_sequence_number_exhaustive() {
const UNIVERSE1: u16 = 1;
const SEQ_NUM_LOWER_BOUND: u8 = 0;
const SEQ_NUM_UPPER_BOUND: u8 = 255;
const LAST_SEQ_NUM: u8 = 1;
const REJECT_RANGE_UPPER_BOUND: i16 = 0;
const REJECT_RANGE_LOWER_BOUND: i16 = -20;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let src_cid: Uuid = Uuid::from_bytes([
0xef, 0x07, 0xc8, 0xdd, 0x00, 0x64, 0x44, 0x01, 0xa3, 0xa2, 0x45, 0x9e, 0xf8, 0xe6,
0x14, 0x3e,
]);
for i in SEQ_NUM_LOWER_BOUND..SEQ_NUM_UPPER_BOUND {
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
dmx_rcv.listen_universes(&[UNIVERSE1]).unwrap();
let data_packet =
generate_data_packet_framing_layer_seq_num(UNIVERSE1, LAST_SEQ_NUM - 1);
let data_packet2 = generate_data_packet_framing_layer_seq_num(UNIVERSE1, LAST_SEQ_NUM);
assert!(
dmx_rcv
.handle_data_packet(src_cid, data_packet)
.unwrap()
.is_some(),
"Receiver incorrectly rejected first data packet"
);
assert!(
dmx_rcv
.handle_data_packet(src_cid, data_packet2)
.unwrap()
.is_some(),
"Receiver incorrectly rejected second data packet"
);
let res = dmx_rcv.handle_data_packet(
src_cid,
generate_data_packet_framing_layer_seq_num(UNIVERSE1, i),
);
let diff: i16 = (i.wrapping_sub(LAST_SEQ_NUM) as i8) as i16;
match res {
Err(SacnError::OutOfSequence(..)) => {
if (diff <= REJECT_RANGE_UPPER_BOUND) && (diff > REJECT_RANGE_LOWER_BOUND) {
assert!(
true,
"Rejection is correct as per ANSI E1.31-2018 Section 6.7.2"
);
} else {
assert!(
false,
"Data packet with sequence number: {} was rejected incorrectly",
i
);
}
}
Ok(_p) => {
if (diff <= REJECT_RANGE_UPPER_BOUND) && (diff > REJECT_RANGE_LOWER_BOUND) {
assert!(
false,
"Data packet with sequence number: {} was accepted incorrectly",
1
);
} else {
assert!(
true,
"Acceptance is correct as per ANSI E1.31-2018 Section 6.7.2"
);
}
}
Err(e) => {
assert!(false, "Receiver produced unexpected error: {}", e);
}
}
}
}
#[test]
fn test_sync_packet_sequence_number_exhaustive() {
const SYNC_ADDR: u16 = 1;
const SEQ_NUM_LOWER_BOUND: u8 = 0;
const SEQ_NUM_UPPER_BOUND: u8 = 255;
const LAST_SEQ_NUM: u8 = 1;
const REJECT_RANGE_UPPER_BOUND: i16 = 0;
const REJECT_RANGE_LOWER_BOUND: i16 = -20;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let src_cid: Uuid = Uuid::from_bytes([
0xef, 0x07, 0xc8, 0xdd, 0x00, 0x64, 0x44, 0x01, 0xa3, 0xa2, 0x45, 0x9e, 0xf8, 0xe6,
0x14, 0x3e,
]);
for i in SEQ_NUM_LOWER_BOUND..SEQ_NUM_UPPER_BOUND {
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
dmx_rcv.listen_universes(&[SYNC_ADDR]).unwrap();
let sync_packet =
generate_sync_packet_framing_layer_seq_num(SYNC_ADDR, LAST_SEQ_NUM - 1);
let sync_packet2 = generate_sync_packet_framing_layer_seq_num(SYNC_ADDR, LAST_SEQ_NUM);
assert!(
dmx_rcv
.handle_sync_packet(src_cid, sync_packet)
.unwrap()
.is_none(),
"Receiver incorrectly rejected first sync packet"
);
assert!(
dmx_rcv
.handle_sync_packet(src_cid, sync_packet2)
.unwrap()
.is_none(),
"Receiver incorrectly rejected second sync packet"
);
let res = dmx_rcv.handle_sync_packet(
src_cid,
generate_sync_packet_framing_layer_seq_num(SYNC_ADDR, i),
);
let diff: i16 = (i.wrapping_sub(LAST_SEQ_NUM) as i8) as i16;
match res {
Err(SacnError::OutOfSequence(..)) => {
if (diff <= REJECT_RANGE_UPPER_BOUND) && (diff > REJECT_RANGE_LOWER_BOUND) {
assert!(
true,
"Rejection is correct as per ANSI E1.31-2018 Section 6.7.2"
);
} else {
assert!(
false,
"Sync packet with sequence number: {} was rejected incorrectly",
i
);
}
}
Ok(_p) => {
if (diff <= REJECT_RANGE_UPPER_BOUND) && (diff > REJECT_RANGE_LOWER_BOUND) {
assert!(
false,
"Sync packet with sequence number: {} was accepted incorrectly",
i
);
} else {
assert!(
true,
"Acceptance is correct as per ANSI E1.31-2018 Section 6.7.2"
);
}
}
Err(e) => {
assert!(false, "Receiver produced unexpected error: {}", e);
}
}
}
}
#[test]
fn test_sync_packet_sequence_number_below_expected() {
const UNIVERSE1: u16 = 1;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
dmx_rcv.listen_universes(&[UNIVERSE1]).unwrap();
let src_cid: Uuid = Uuid::from_bytes([
0xef, 0x07, 0xc8, 0xdd, 0x00, 0x64, 0x44, 0x01, 0xa3, 0xa2, 0x45, 0x9e, 0xf8, 0xe6,
0x14, 0x3e,
]);
let sync_packet = generate_sync_packet_framing_layer_seq_num(UNIVERSE1, 0);
let sync_packet2 = generate_sync_packet_framing_layer_seq_num(UNIVERSE1, 1);
let sync_packet3 = generate_sync_packet_framing_layer_seq_num(UNIVERSE1, 0);
assert!(
dmx_rcv
.handle_sync_packet(src_cid, sync_packet)
.unwrap()
.is_none(),
"Receiver incorrectly rejected first sync packet"
);
assert!(
dmx_rcv
.handle_sync_packet(src_cid, sync_packet2)
.unwrap()
.is_none(),
"Receiver incorrectly rejected second sync packet"
);
match dmx_rcv.handle_sync_packet(src_cid, sync_packet3) {
Err(SacnError::OutOfSequence(..)) => {
assert!(
true,
"Receiver correctly rejected third sync packet with correct error"
);
}
Ok(_) => {
assert!(false, "Receiver incorrectly accepted third sync packet");
}
Err(e) => {
assert!(
false,
"Receiver correctly rejected third sync packet but with unexpected error: {}",
e
);
}
}
}
#[test]
fn test_sync_packet_sequence_number_reset() {
const UNIVERSE1: u16 = 1;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
dmx_rcv.listen_universes(&[UNIVERSE1]).unwrap();
let src_cid: Uuid = Uuid::from_bytes([
0xef, 0x07, 0xc8, 0xdd, 0x00, 0x64, 0x44, 0x01, 0xa3, 0xa2, 0x45, 0x9e, 0xf8, 0xe6,
0x14, 0x3e,
]);
let sync_packet = generate_sync_packet_framing_layer_seq_num(UNIVERSE1, 0);
let sync_packet2 = generate_sync_packet_framing_layer_seq_num(UNIVERSE1, 1);
let sync_packet3 = generate_sync_packet_framing_layer_seq_num(UNIVERSE1, 0);
assert!(
dmx_rcv
.handle_sync_packet(src_cid, sync_packet)
.unwrap()
.is_none(),
"Receiver incorrectly rejected first sync packet"
);
assert!(
dmx_rcv
.handle_sync_packet(src_cid, sync_packet2)
.unwrap()
.is_none(),
"Receiver incorrectly rejected second sync packet"
);
dmx_rcv.reset_sources();
assert!(
dmx_rcv
.handle_sync_packet(src_cid, sync_packet3)
.unwrap()
.is_none(),
"Receiver incorrectly rejected third sync packet"
);
}
#[test]
fn test_data_packet_sequence_number_reset() {
const UNIVERSE1: u16 = 1;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
dmx_rcv.listen_universes(&[UNIVERSE1]).unwrap();
let src_cid: Uuid = Uuid::from_bytes([
0xef, 0x07, 0xc8, 0xdd, 0x00, 0x64, 0x44, 0x01, 0xa3, 0xa2, 0x45, 0x9e, 0xf8, 0xe6,
0x14, 0x3e,
]);
let data_packet = generate_data_packet_framing_layer_seq_num(UNIVERSE1, 0);
let data_packet2 = generate_data_packet_framing_layer_seq_num(UNIVERSE1, 1);
let data_packet3 = generate_data_packet_framing_layer_seq_num(UNIVERSE1, 0);
assert!(
dmx_rcv
.handle_data_packet(src_cid, data_packet)
.unwrap()
.is_some(),
"Receiver incorrectly rejected first data packet"
);
assert!(
dmx_rcv
.handle_data_packet(src_cid, data_packet2)
.unwrap()
.is_some(),
"Receiver incorrectly rejected second data packet"
);
dmx_rcv.reset_sources();
assert!(
dmx_rcv
.handle_data_packet(src_cid, data_packet3)
.unwrap()
.is_some(),
"Receiver incorrectly rejected third data packet"
);
}
#[test]
fn test_sequence_number_packet_type_independence() {
const UNIVERSE: u16 = 1;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
dmx_rcv.listen_universes(&[UNIVERSE]).unwrap();
let src_cid: Uuid = Uuid::from_bytes([
0xef, 0x07, 0xc8, 0xdd, 0x00, 0x64, 0x44, 0x01, 0xa3, 0xa2, 0x45, 0x9e, 0xf8, 0xe6,
0x14, 0x3e,
]);
let data_packet = generate_data_packet_framing_layer_seq_num(UNIVERSE, 0);
let data_packet2 = generate_data_packet_framing_layer_seq_num(UNIVERSE, 1);
let sync_packet = generate_sync_packet_framing_layer_seq_num(UNIVERSE, 0);
assert!(
dmx_rcv
.handle_data_packet(src_cid, data_packet)
.unwrap()
.is_some(),
"Receiver incorrectly rejected first data packet"
);
assert!(
dmx_rcv
.handle_data_packet(src_cid, data_packet2)
.unwrap()
.is_some(),
"Receiver incorrectly rejected second data packet"
);
assert!(
dmx_rcv
.handle_sync_packet(src_cid, sync_packet)
.unwrap()
.is_none(),
"Receiver incorrectly rejected synchronisation packet"
);
}
#[test]
fn test_data_packet_sequence_number_universe_independence() {
const UNIVERSE1: u16 = 1;
const UNIVERSE2: u16 = 2;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
dmx_rcv.listen_universes(&[UNIVERSE1, UNIVERSE2]).unwrap();
let src_cid: Uuid = Uuid::from_bytes([
0xef, 0x07, 0xc8, 0xdd, 0x00, 0x64, 0x44, 0x01, 0xa3, 0xa2, 0x45, 0x9e, 0xf8, 0xe6,
0x14, 0x3e,
]);
let data_packet = generate_data_packet_framing_layer_seq_num(UNIVERSE1, 0);
let data_packet2 = generate_data_packet_framing_layer_seq_num(UNIVERSE1, 1);
let data_packet3 = generate_data_packet_framing_layer_seq_num(UNIVERSE2, 0);
assert!(
dmx_rcv
.handle_data_packet(src_cid, data_packet)
.unwrap()
.is_some(),
"Receiver incorrectly rejected first data packet"
);
assert!(
dmx_rcv
.handle_data_packet(src_cid, data_packet2)
.unwrap()
.is_some(),
"Receiver incorrectly rejected second data packet"
);
assert!(
dmx_rcv
.handle_data_packet(src_cid, data_packet3)
.unwrap()
.is_some(),
"Receiver incorrectly rejected third data packet"
);
}
#[test]
fn test_sync_packet_sequence_number_universe_independence() {
const SYNC_ADDR_1: u16 = 1;
const SYNC_ADDR_2: u16 = 2;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
dmx_rcv
.listen_universes(&[SYNC_ADDR_1, SYNC_ADDR_2])
.unwrap();
let src_cid: Uuid = Uuid::from_bytes([
0xef, 0x07, 0xc8, 0xdd, 0x00, 0x64, 0x44, 0x01, 0xa3, 0xa2, 0x45, 0x9e, 0xf8, 0xe6,
0x14, 0x3e,
]);
let sync_packet = generate_sync_packet_framing_layer_seq_num(SYNC_ADDR_1, 0);
let sync_packet2 = generate_sync_packet_framing_layer_seq_num(SYNC_ADDR_1, 1);
let sync_packet3 = generate_sync_packet_framing_layer_seq_num(SYNC_ADDR_2, 0);
assert!(
dmx_rcv
.handle_sync_packet(src_cid, sync_packet)
.unwrap()
.is_none(),
"Receiver incorrectly rejected first sync packet"
);
assert!(
dmx_rcv
.handle_sync_packet(src_cid, sync_packet2)
.unwrap()
.is_none(),
"Receiver incorrectly rejected second sync packet"
);
assert!(
dmx_rcv
.handle_sync_packet(src_cid, sync_packet3)
.unwrap()
.is_none(),
"Receiver incorrectly rejected third sync packet"
);
}
#[test]
fn test_source_limit_0() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let source_limit: Option<usize> = Some(0);
match SacnReceiver::with_ip(addr, source_limit) {
Err(e) => match e {
SacnError::SourceLimitZero() => {
assert!(true, "Correct error returned");
}
_ => {
assert!(false, "Unexpected error type returned");
}
},
_ => {
assert!(
false,
"SacnReceiver accepted 0 source limit when it shouldn't"
);
}
}
}
#[test]
fn test_is_multicast_enabled() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
assert!(
dmx_rcv.is_multicast_enabled(),
"Multicast not enabled by default"
);
}
#[test]
fn test_set_is_multicast_enabled() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
dmx_rcv.set_is_multicast_enabled(false).unwrap();
assert!(
!dmx_rcv.is_multicast_enabled(),
"Multicast not disabled correctly"
);
}
#[test]
fn test_clear_waiting_data() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
const SYNC_ADDR: u16 = 1;
let data: DMXData = DMXData {
universe: 0,
values: vec![1, 2, 3],
sync_uni: SYNC_ADDR,
priority: 100,
src_cid: Some(Uuid::new_v4()),
preview: false,
recv_timestamp: Instant::now(),
};
dmx_rcv.store_waiting_data(data).unwrap();
dmx_rcv.clear_all_waiting_data();
assert_eq!(
dmx_rcv.rtrv_waiting_data(SYNC_ADDR),
Vec::new(),
"Data was not reset as expected"
);
}
#[test]
fn test_get_announce_source_discovery() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
assert!(
!dmx_rcv.get_announce_source_discovery(),
"Announce source discovery is true by default when should be false"
);
}
#[test]
fn test_get_announce_timeout() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
assert!(
!dmx_rcv.get_announce_timeout(),
"Announce timeout flag is true by default when should be false"
);
}
#[test]
fn test_get_announce_stream_termination() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
assert!(
!dmx_rcv.get_announce_stream_termination(),
"Announce termination flag is true by default when should be false"
);
}
#[test]
fn test_handle_sync_packet_not_listening_to_sync_addr() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();
let res = dmx_rcv
.handle_sync_packet(
Uuid::new_v4(),
SynchronizationPacketFramingLayer {
sequence_number: 0,
synchronization_address: 1,
},
)
.unwrap();
assert_eq!(
res, None,
"Sync packet produced output when should have been ignored as for an address that isn't being listened to"
);
}
#[test]
fn test_dmx_data_eq() {
const UNIVERSE: u16 = 1;
let values: Vec<u8> = vec![1, 2, 3];
const SYNC_ADDR: u16 = 1;
const PRIORITY: u8 = 100;
const PREVIEW: bool = false;
let data1 = DMXData {
universe: UNIVERSE,
values: values.clone(),
sync_uni: SYNC_ADDR,
priority: PRIORITY,
src_cid: Some(Uuid::new_v4()),
preview: PREVIEW,
recv_timestamp: Instant::now(),
};
let data2 = DMXData {
universe: UNIVERSE,
values: values,
sync_uni: SYNC_ADDR,
priority: PRIORITY + 50,
src_cid: None,
preview: !PREVIEW,
recv_timestamp: Instant::now(),
};
assert_eq!(
data1, data2,
"DMX data not seen as equivalent when should be"
);
}
#[test]
fn test_data_sequence_number_wraparound_255_to_0() {
const UNIVERSE: u16 = 1;
let src_cid: Uuid = Uuid::from_bytes([
0xef, 0x07, 0xc8, 0xdd, 0x00, 0x64, 0x44, 0x01, 0xa3, 0xa2, 0x45, 0x9e, 0xf8, 0xe6,
0x14, 0x3e,
]);
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ACN_SDT_MULTICAST_PORT);
let mut rcv = SacnReceiver::with_ip(addr, None).unwrap();
rcv.listen_universes(&[UNIVERSE]).unwrap();
let pkt = generate_data_packet_framing_layer_seq_num(UNIVERSE, 21u8);
let _ = rcv.handle_data_packet(src_cid, pkt);
for seq in 250u8..=255u8 {
let pkt = generate_data_packet_framing_layer_seq_num(UNIVERSE, seq);
let res = rcv.handle_data_packet(src_cid, pkt);
assert!(
res.is_ok(),
"sequence {} should be accepted (got {:?})",
seq,
res
);
}
let pkt0 = generate_data_packet_framing_layer_seq_num(UNIVERSE, 0);
let res0 = rcv.handle_data_packet(src_cid, pkt0);
assert!(
res0.is_ok(),
"sequence wrap 255->0 should be accepted (got {:?})",
res0
);
let pkt1 = generate_data_packet_framing_layer_seq_num(UNIVERSE, 1);
let res1 = rcv.handle_data_packet(src_cid, pkt1);
assert!(
res1.is_ok(),
"sequence 1 after wrap should be accepted (got {:?})",
res1
);
}
}