use std::{
collections::HashMap,
io::ErrorKind,
net::Ipv4Addr,
sync::{atomic, Arc, Mutex, RwLock, Weak},
thread,
thread::JoinHandle,
time::{Duration, Instant},
};
use mio_extras::channel as mio_channel;
use mio::Token;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use crate::{
dds::{
dp_event_loop::DPEventLoop, pubsub::*, qos::*, reader::*, topic::*, typedesc::TypeDesc,
values::result::*, writer::WriterIngredients,
},
discovery::{
data_types::topic_data::DiscoveredTopicData,
discovery::{Discovery, DiscoveryCommand},
discovery_db::DiscoveryDB,
},
log_and_err_internal,
network::{constant::*, udp_listener::UDPListener},
structure::{dds_cache::DDSCache, entity::RTPSEntity, guid::*, locator::Locator},
};
use super::dp_event_loop::DomainInfo;
#[derive(Clone)]
pub struct DomainParticipant {
dpi: Arc<Mutex<DomainParticipantDisc>>,
}
#[allow(clippy::new_without_default)]
impl DomainParticipant {
pub fn new(domain_id: u16) -> Result<Self> {
trace!("DomainParticipant construct start");
let (djh_sender, djh_receiver) = mio_channel::channel();
let (spdp_liveness_sender, spdp_liveness_receiver) = mio_channel::sync_channel(8);
let (discovery_updated_sender, discovery_update_notification_receiver) =
mio_channel::sync_channel::<DiscoveryNotificationType>(32);
let (discovery_command_sender, discovery_command_receiver) =
mio_channel::sync_channel::<DiscoveryCommand>(64);
let dp = DomainParticipantDisc::new(
domain_id,
djh_receiver,
discovery_update_notification_receiver,
discovery_command_sender,
spdp_liveness_sender,
)?;
let self_locators = dp.self_locators();
let dp = Self {
dpi: Arc::new(Mutex::new(dp)),
};
let (discovery_started_sender, discovery_started_receiver) =
std::sync::mpsc::channel::<Result<()>>();
let dp_clone = dp.weak_clone();
let disc_db_clone = dp.discovery_db();
let discovery_handle = thread::Builder::new()
.name("RustDDS discovery thread".to_string())
.spawn(move || {
if let Ok(mut discovery) = Discovery::new(
dp_clone,
disc_db_clone,
discovery_started_sender,
discovery_updated_sender,
discovery_command_receiver,
spdp_liveness_receiver,
self_locators,
) {
discovery.discovery_event_loop(); }
})?;
djh_sender.send(discovery_handle).unwrap_or(());
debug!("Waiting for discovery to start"); match discovery_started_receiver.recv_timeout(Duration::from_secs(10)) {
Ok(Ok(())) => {
info!("Discovery started. Participant constructed.");
Ok(dp)
}
Ok(Err(e)) => {
std::mem::drop(dp);
log_and_err_internal!("Failed to start discovery thread: {:?}", e)
}
Err(e) => log_and_err_internal!("Discovery thread channel error: {:?}", e),
}
}
pub fn create_publisher(&self, qos: &QosPolicies) -> Result<Publisher> {
let w = self.weak_clone(); self.dpi.lock().unwrap().create_publisher(&w, qos)
}
pub fn create_subscriber(&self, qos: &QosPolicies) -> Result<Subscriber> {
let w = self.weak_clone(); self.dpi.lock().unwrap().create_subscriber(&w, qos)
}
pub fn create_topic(
&self,
name: String,
type_desc: String,
qos: &QosPolicies,
topic_kind: TopicKind,
) -> Result<Topic> {
let w = self.weak_clone();
self
.dpi
.lock()
.unwrap()
.create_topic(&w, name, type_desc, qos, topic_kind)
}
pub fn find_topic(&self, name: &str, timeout: Duration) -> Result<Option<Topic>> {
let w = self.weak_clone();
self.dpi.lock().unwrap().find_topic(&w, name, timeout)
}
pub fn domain_id(&self) -> u16 {
self.dpi.lock().unwrap().domain_id()
}
pub fn participant_id(&self) -> u16 {
self.dpi.lock().unwrap().participant_id()
}
pub fn discovered_topics(&self) -> Vec<DiscoveredTopicData> {
self.dpi.lock().unwrap().discovered_topics()
}
pub fn assert_liveliness(self) -> Result<()> {
self.dpi.lock().unwrap().assert_liveliness()
}
pub(crate) fn weak_clone(&self) -> DomainParticipantWeak {
DomainParticipantWeak::new(self, self.guid())
}
pub(crate) fn dds_cache(&self) -> Arc<RwLock<DDSCache>> {
self.dpi.lock().unwrap().dds_cache()
}
pub(crate) fn discovery_db(&self) -> Arc<RwLock<DiscoveryDB>> {
self
.dpi
.lock()
.unwrap()
.dpi
.lock()
.unwrap()
.discovery_db
.clone()
}
pub(crate) fn new_entity_id(&self, entity_kind: EntityKind) -> EntityId {
self.dpi.lock().unwrap().new_entity_id(entity_kind)
}
pub(crate) fn self_locators(&self) -> HashMap<Token, Vec<Locator>> {
self.dpi.lock().unwrap().self_locators()
}
}
impl PartialEq for DomainParticipant {
fn eq(&self, other: &Self) -> bool {
self.guid() == other.guid()
&& self.domain_id() == other.domain_id()
&& self.participant_id() == other.participant_id()
}
}
#[derive(Clone)]
pub struct DomainParticipantWeak {
dpi: Weak<Mutex<DomainParticipantDisc>>,
guid: GUID,
}
impl DomainParticipantWeak {
pub fn new(dp: &DomainParticipant, guid: GUID) -> Self {
Self {
dpi: Arc::downgrade(&dp.dpi),
guid,
}
}
pub fn create_publisher(&self, qos: &QosPolicies) -> Result<Publisher> {
self
.dpi
.upgrade()
.ok_or(Error::OutOfResources)
.and_then(|dpi| dpi.lock().unwrap().create_publisher(self, qos))
}
pub fn create_subscriber(&self, qos: &QosPolicies) -> Result<Subscriber> {
self
.dpi
.upgrade()
.ok_or(Error::OutOfResources)
.and_then(|dpi| dpi.lock().unwrap().create_subscriber(self, qos))
}
pub fn create_topic(
&self,
name: String,
type_desc: String,
qos: &QosPolicies,
topic_kind: TopicKind,
) -> Result<Topic> {
self
.dpi
.upgrade()
.ok_or(Error::LockPoisoned)
.and_then(|dpi| {
dpi
.lock()
.unwrap()
.create_topic(self, name, type_desc, qos, topic_kind)
})
}
pub fn upgrade(self) -> Option<DomainParticipant> {
self.dpi.upgrade().map(|d| DomainParticipant { dpi: d })
}
}
impl RTPSEntity for DomainParticipantWeak {
fn guid(&self) -> GUID {
self.guid
}
}
pub(crate) struct DomainParticipantDisc {
dpi: Arc<Mutex<DomainParticipantInner>>,
discovery_command_sender: mio_channel::SyncSender<DiscoveryCommand>,
discovery_join_handle: mio_channel::Receiver<JoinHandle<()>>,
entity_id_generator: atomic::AtomicU32,
}
impl DomainParticipantDisc {
pub fn new(
domain_id: u16,
discovery_join_handle: mio_channel::Receiver<JoinHandle<()>>,
discovery_update_notification_receiver: mio_channel::Receiver<DiscoveryNotificationType>,
discovery_command_sender: mio_channel::SyncSender<DiscoveryCommand>,
spdp_liveness_sender: mio_channel::SyncSender<GuidPrefix>,
) -> Result<Self> {
let dpi = DomainParticipantInner::new(
domain_id,
discovery_update_notification_receiver,
spdp_liveness_sender,
)?;
Ok(Self {
dpi: Arc::new(Mutex::new(dpi)),
discovery_command_sender,
discovery_join_handle,
entity_id_generator: atomic::AtomicU32::new(0),
})
}
pub(crate) fn new_entity_id(&self, entity_kind: EntityKind) -> EntityId {
let [_goldilocks, papa_byte, mama_byte, baby_byte] = self
.entity_id_generator
.fetch_add(1, atomic::Ordering::Relaxed)
.to_be_bytes();
EntityId::new([papa_byte, mama_byte, baby_byte], entity_kind)
}
pub fn create_publisher(
&self,
dp: &DomainParticipantWeak,
qos: &QosPolicies,
) -> Result<Publisher> {
self
.dpi
.lock()
.unwrap()
.create_publisher(dp, qos, self.discovery_command_sender.clone())
}
pub fn create_subscriber(
&self,
dp: &DomainParticipantWeak,
qos: &QosPolicies,
) -> Result<Subscriber> {
self
.dpi
.lock()
.unwrap()
.create_subscriber(dp, qos, self.discovery_command_sender.clone())
}
pub fn create_topic(
&self,
dp: &DomainParticipantWeak,
name: String,
type_desc: String,
qos: &QosPolicies,
topic_kind: TopicKind,
) -> Result<Topic> {
self
.dpi
.lock()
.unwrap()
.create_topic(dp, name, type_desc, qos, topic_kind)
}
pub fn find_topic(
&self,
dp: &DomainParticipantWeak,
name: &str,
timeout: Duration,
) -> Result<Option<Topic>> {
self.dpi.lock().unwrap().find_topic(dp, name, timeout)
}
pub fn domain_id(&self) -> u16 {
self.dpi.lock().unwrap().domain_id()
}
pub fn participant_id(&self) -> u16 {
self.dpi.lock().unwrap().participant_id()
}
pub fn discovered_topics(&self) -> Vec<DiscoveredTopicData> {
self.dpi.lock().unwrap().discovered_topics()
}
pub(crate) fn dds_cache(&self) -> Arc<RwLock<DDSCache>> {
self.dpi.lock().unwrap().dds_cache()
}
pub(crate) fn assert_liveliness(&self) -> Result<()> {
self
.discovery_command_sender
.send(DiscoveryCommand::ManualAssertLiveliness)
.or_else(|e| {
log_and_err_internal!("assert_liveness - Failed to send DiscoveryCommand. {:?}", e)
})
}
pub(crate) fn self_locators(&self) -> HashMap<Token, Vec<Locator>> {
self.dpi.lock().unwrap().self_locators.clone()
}
}
impl Drop for DomainParticipantDisc {
fn drop(&mut self) {
info!("===== RustDDS shutting down ===== .drop() DomainParticipantDisc");
debug!("Sending Discovery Stop signal.");
if self
.discovery_command_sender
.send(DiscoveryCommand::StopDiscovery)
.is_err()
{
warn!("Failed to send stop signal to Discovery");
return;
}
debug!("Waiting for Discovery join.");
if let Ok(handle) = self.discovery_join_handle.try_recv() {
handle.join().unwrap();
debug!("Joined Discovery.");
}
}
}
pub(crate) struct DomainParticipantInner {
domain_id: u16,
participant_id: u16,
my_guid: GUID,
sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
sender_remove_reader: mio_channel::SyncSender<GUID>,
stop_poll_sender: mio_channel::Sender<()>,
ev_loop_handle: Option<JoinHandle<()>>,
add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
remove_writer_sender: mio_channel::SyncSender<GUID>,
dds_cache: Arc<RwLock<DDSCache>>,
discovery_db: Arc<RwLock<DiscoveryDB>>,
discovery_db_event_receiver: mio_channel::Receiver<()>,
self_locators: HashMap<Token, Vec<Locator>>,
}
impl Drop for DomainParticipantInner {
fn drop(&mut self) {
if self.stop_poll_sender.send(()).is_err() {
return;
}
debug!("Waiting for dp_event_loop join");
match self.ev_loop_handle.take() {
Some(join_handle) => {
join_handle
.join()
.unwrap_or_else(|e| warn!("Failed to join dp_event_loop: {:?}", e));
}
None => {
error!("Someone managed to steal dp_event_loop join handle from DomainParticipantInner.");
}
}
debug!("Joined dp_event_loop");
}
}
#[allow(clippy::new_without_default)]
impl DomainParticipantInner {
fn new(
domain_id: u16,
discovery_update_notification_receiver: mio_channel::Receiver<DiscoveryNotificationType>,
spdp_liveness_sender: mio_channel::SyncSender<GuidPrefix>,
) -> Result<Self> {
let mut listeners = HashMap::new();
match UDPListener::new_multicast(
"0.0.0.0",
spdp_well_known_multicast_port(domain_id),
Ipv4Addr::new(239, 255, 0, 1),
) {
Ok(l) => {
listeners.insert(DISCOVERY_MUL_LISTENER_TOKEN, l);
}
Err(e) => warn!("Cannot get multicast discovery listener: {:?}", e),
}
let mut participant_id = 0;
let mut discovery_listener = None;
while discovery_listener.is_none() && participant_id < 120 {
discovery_listener = UDPListener::new_unicast(
"0.0.0.0",
spdp_well_known_unicast_port(domain_id, participant_id),
)
.ok();
if discovery_listener.is_none() {
participant_id += 1;
}
}
info!("ParticipantId {} selected.", participant_id);
let discovery_listener = match discovery_listener {
Some(dl) => dl,
None => return log_and_err_internal!("Could not find free ParticipantId"),
};
listeners.insert(DISCOVERY_LISTENER_TOKEN, discovery_listener);
match UDPListener::new_multicast(
"0.0.0.0",
user_traffic_multicast_port(domain_id),
Ipv4Addr::new(239, 255, 0, 1),
) {
Ok(l) => {
listeners.insert(USER_TRAFFIC_MUL_LISTENER_TOKEN, l);
}
Err(e) => warn!("Cannot get multicast user traffic listener: {:?}", e),
}
let user_traffic_listener = UDPListener::new_unicast(
"0.0.0.0",
user_traffic_unicast_port(domain_id, participant_id),
)
.or_else(|e| {
if matches!(e.kind(), ErrorKind::AddrInUse) {
UDPListener::new_unicast("0.0.0.0", 0).or_else(|e| {
log_and_err_internal!(
"Could not open unicast user traffic listener, any port number: {:?}",
e
)
})
} else {
log_and_err_internal!("Could not open unicast user traffic listener: {:?}", e)
}
})?;
listeners.insert(USER_TRAFFIC_LISTENER_TOKEN, user_traffic_listener);
let self_locators: HashMap<Token, Vec<Locator>> = listeners
.iter()
.map(|(t, l)| match l.to_locator_address() {
Ok(locs) => (*t, locs),
Err(e) => {
error!("No local network address for token {:?}: {:?}", t, e);
(*t, vec![])
}
})
.collect();
let (sender_add_reader, receiver_add_reader) =
mio_channel::sync_channel::<ReaderIngredients>(100);
let (sender_remove_reader, receiver_remove_reader) = mio_channel::sync_channel::<GUID>(10);
let (add_writer_sender, add_writer_receiver) =
mio_channel::sync_channel::<WriterIngredients>(10);
let (remove_writer_sender, remove_writer_receiver) = mio_channel::sync_channel::<GUID>(10);
let new_guid = GUID::new_participant_guid();
let domain_info = DomainInfo {
domain_participant_guid: new_guid,
domain_id,
participant_id,
};
let dds_cache = Arc::new(RwLock::new(DDSCache::new()));
let (discovery_db_event_sender, discovery_db_event_receiver) =
mio_channel::sync_channel::<()>(1);
let discovery_db = Arc::new(RwLock::new(DiscoveryDB::new(
new_guid,
discovery_db_event_sender,
)));
let (stop_poll_sender, stop_poll_receiver) = mio_channel::channel::<()>();
let dds_cache_clone = dds_cache.clone();
let disc_db_clone = discovery_db.clone();
let ev_loop_handle = thread::Builder::new()
.name(format!("RustDDS Participant {} event loop", participant_id))
.spawn(move || {
let dp_event_loop = DPEventLoop::new(
domain_info,
listeners,
dds_cache_clone,
disc_db_clone,
new_guid.prefix,
TokenReceiverPair {
token: ADD_READER_TOKEN,
receiver: receiver_add_reader,
},
TokenReceiverPair {
token: REMOVE_READER_TOKEN,
receiver: receiver_remove_reader,
},
TokenReceiverPair {
token: ADD_WRITER_TOKEN,
receiver: add_writer_receiver,
},
TokenReceiverPair {
token: REMOVE_WRITER_TOKEN,
receiver: remove_writer_receiver,
},
stop_poll_receiver,
discovery_update_notification_receiver,
spdp_liveness_sender,
);
dp_event_loop.event_loop();
})?;
info!(
"New DomainParticipantInner: domain_id={:?} participant_id={:?} GUID={:?}",
domain_id, participant_id, new_guid
);
Ok(Self {
domain_id,
participant_id,
my_guid: new_guid,
sender_add_reader,
sender_remove_reader,
stop_poll_sender,
ev_loop_handle: Some(ev_loop_handle),
add_writer_sender,
remove_writer_sender,
dds_cache,
discovery_db,
discovery_db_event_receiver,
self_locators,
})
}
pub fn dds_cache(&self) -> Arc<RwLock<DDSCache>> {
self.dds_cache.clone()
}
pub fn create_publisher(
&self,
domain_participant: &DomainParticipantWeak,
qos: &QosPolicies,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
) -> Result<Publisher> {
Ok(Publisher::new(
domain_participant.clone(),
self.discovery_db.clone(),
qos.clone(),
qos.clone(),
self.add_writer_sender.clone(),
self.remove_writer_sender.clone(),
discovery_command,
))
}
pub fn create_subscriber(
&self,
domain_participant: &DomainParticipantWeak,
qos: &QosPolicies,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
) -> Result<Subscriber> {
Ok(Subscriber::new(
domain_participant.clone(),
self.discovery_db.clone(),
qos.clone(),
self.sender_add_reader.clone(),
self.sender_remove_reader.clone(),
discovery_command,
))
}
pub fn create_topic(
&self,
domain_participant_weak: &DomainParticipantWeak,
name: String,
type_desc: String,
qos: &QosPolicies,
topic_kind: TopicKind,
) -> Result<Topic> {
let topic = Topic::new(
domain_participant_weak,
name,
TypeDesc::new(type_desc),
qos,
topic_kind,
);
Ok(topic)
}
pub fn find_topic(
&self,
domain_participant_weak: &DomainParticipantWeak,
name: &str,
timeout: Duration,
) -> Result<Option<Topic>> {
let poll = mio::Poll::new()?;
let mut events = mio::Events::with_capacity(1);
poll.register(
&self.discovery_db_event_receiver,
mio::Token(0),
mio::Ready::readable(),
mio::PollOpt::level(),
)?;
let find_end = Instant::now() + timeout;
loop {
if let Some(topic) = self.find_topic_in_discovery_db(domain_participant_weak, name)? {
return Ok(Some(topic));
}
let timeout = find_end - Instant::now();
poll.poll(&mut events, Some(timeout))?;
if let Some(_event) = events.iter().next() {
if self.discovery_db_event_receiver.try_recv().is_ok() {
continue;
}
}
if Instant::now() > find_end {
break;
}
}
Ok(None)
}
fn find_topic_in_discovery_db(
&self,
domain_participant_weak: &DomainParticipantWeak,
name: &str,
) -> Result<Option<Topic>> {
let db = self.discovery_db.read().map_err(|_| Error::LockPoisoned)?;
let build_topic_fn = |d: &DiscoveredTopicData| {
let qos = d.topic_data.qos();
let topic_kind = match d.topic_data.key {
Some(_) => TopicKind::WithKey,
None => TopicKind::NoKey,
};
let name = d.topic_name().clone();
let type_desc = d.topic_data.type_name.clone();
self.create_topic(domain_participant_weak, name, type_desc, &qos, topic_kind)
};
if let Some(d) = db.get_topic(name) {
build_topic_fn(d).map(Some)
} else {
Ok(None)
}
}
pub fn domain_id(&self) -> u16 {
self.domain_id
}
pub fn participant_id(&self) -> u16 {
self.participant_id
}
pub fn discovered_topics(&self) -> Vec<DiscoveredTopicData> {
let db = self
.discovery_db
.read()
.unwrap_or_else(|e| panic!("DiscoveryDB is poisoned. {:?}", e));
db.all_user_topics().cloned().collect()
}
}
impl RTPSEntity for DomainParticipant {
fn guid(&self) -> GUID {
self.dpi.lock().unwrap().guid()
}
}
impl RTPSEntity for DomainParticipantDisc {
fn guid(&self) -> GUID {
self.dpi.lock().unwrap().guid()
}
}
impl RTPSEntity for DomainParticipantInner {
fn guid(&self) -> GUID {
self.my_guid
}
}
impl std::fmt::Debug for DomainParticipant {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DomainParticipant")
.field("Guid", &self.guid())
.finish()
}
}
#[cfg(test)]
mod tests {
use std::{
collections::BTreeSet,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
};
use enumflags2::BitFlags;
use log::info;
use speedy::{Endianness, Writable};
use byteorder::LittleEndian;
use crate::{
dds::{qos::QosPolicies, topic::TopicKind},
messages::{
header::Header,
protocol_id::ProtocolId,
protocol_version::ProtocolVersion,
submessages::submessages::{AckNack, EntitySubmessage, SubmessageHeader, SubmessageKind, *},
vendor_id::VendorId,
},
network::{constant::user_traffic_unicast_port, udp_sender::UDPSender},
serialization::{cdr_serializer::CDRSerializerAdapter, submessage::*, Message, SubMessage},
structure::{
guid::{EntityId, GUID},
locator::Locator,
sequence_number::{SequenceNumber, SequenceNumberSet},
},
test::random_data::RandomData,
};
use super::DomainParticipant;
#[test]
fn dp_basic_domain_participant() {
let sender = UDPSender::new(11401).unwrap();
let data: Vec<u8> = vec![0, 1, 2, 3, 4];
let addrs = vec![SocketAddr::new("127.0.0.1".parse().unwrap(), 7412)];
sender.send_to_all(&data, &addrs);
}
#[test]
fn dp_writer_heartbeat_test() {
let domain_participant = DomainParticipant::new(0).expect("Participant creation failed!");
let qos = QosPolicies::qos_none();
let _default_dw_qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic(
"Aasii".to_string(),
"RandomData".to_string(),
&qos,
TopicKind::WithKey,
)
.expect("Failed to create topic");
let mut _data_writer = publisher
.create_datawriter::<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>>(&topic, None)
.expect("Failed to create datawriter");
}
#[test]
fn dp_receive_acknack_message_test() {
let domain_participant = DomainParticipant::new(0).expect("Failed to create participant");
let qos = QosPolicies::qos_none();
let _default_dw_qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic(
"Aasii".to_string(),
"Huh?".to_string(),
&qos,
TopicKind::WithKey,
)
.expect("Failed to create topic");
let mut _data_writer = publisher
.create_datawriter::<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>>(&topic, None)
.expect("Failed to create datawriter");
let port_number: u16 = user_traffic_unicast_port(5, 0);
let sender = UDPSender::new(1234).unwrap();
let mut m: Message = Message::default();
let a: AckNack = AckNack {
reader_id: EntityId::SPDP_BUILTIN_PARTICIPANT_READER,
writer_id: EntityId::SPDP_BUILTIN_PARTICIPANT_WRITER,
reader_sn_state: SequenceNumberSet::from_base_and_set(
SequenceNumber::default(),
&BTreeSet::new(),
),
count: 1,
};
let flags = BitFlags::<ACKNACK_Flags>::from_endianness(Endianness::BigEndian);
let sub_header: SubmessageHeader = SubmessageHeader {
kind: SubmessageKind::ACKNACK,
flags: flags.bits(),
content_length: 24,
};
let s: SubMessage = SubMessage {
header: sub_header,
body: SubmessageBody::Entity(EntitySubmessage::AckNack(a, flags)),
};
let h = Header {
protocol_id: ProtocolId::default(),
protocol_version: ProtocolVersion { major: 2, minor: 3 },
vendor_id: VendorId::THIS_IMPLEMENTATION,
guid_prefix: GUID::default().prefix,
};
m.set_header(h);
m.add_submessage(s);
let _data: Vec<u8> = m.write_to_vec_with_ctx(Endianness::LittleEndian).unwrap();
info!("data to send via udp: {:?}", _data);
let ip = Ipv4Addr::from([0x00, 0x00, 0x00, 0x00]);
let socket_address = SocketAddrV4::new(ip, port_number);
let locators = vec![Locator::UdpV4(socket_address)];
sender.send_to_locator_list(&_data, &locators);
}
}