use std::{
collections::{HashMap, HashSet},
sync::{Arc, RwLock},
};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use mio_extras::channel as mio_channel;
use crate::{
create_security_error_and_log,
dds::{
no_key,
participant::DomainParticipantWeak,
with_key::{DataSample, Sample, WriteOptionsBuilder},
WriteError,
},
qos, rpc,
rtps::constant::{
DiscoveryNotificationType, SECURE_BUILTIN_READERS_INIT_LIST, SECURE_BUILTIN_WRITERS_INIT_LIST,
},
security::{
access_control::{EndpointSecurityAttributes, ParticipantSecurityAttributes, PermissionsToken},
authentication::{
authentication_builtin::DiscHandshakeState, HandshakeMessageToken, IdentityToken,
ValidationOutcome, GMCLASSID_SECURITY_AUTH_HANDSHAKE,
},
cryptographic::{
CryptoToken, GMCLASSID_SECURITY_DATAREADER_CRYPTO_TOKENS,
GMCLASSID_SECURITY_DATAWRITER_CRYPTO_TOKENS, GMCLASSID_SECURITY_PARTICIPANT_CRYPTO_TOKENS,
},
security_error,
security_plugins::SecurityPluginsHandle,
DataHolder, ParticipantBuiltinTopicDataSecure, ParticipantGenericMessage,
ParticipantSecurityInfo, ParticipantStatelessMessage, ParticipantVolatileMessageSecure,
PublicationBuiltinTopicDataSecure, SecurityError, SecurityResult,
SubscriptionBuiltinTopicDataSecure,
},
security_info, security_warn,
serialization::pl_cdr_adapters::PlCdrSerialize,
structure::{
entity::RTPSEntity,
guid::{EntityId, GuidPrefix},
},
with_key::{self, DataWriterCdr},
RepresentationIdentifier, SequenceNumber, GUID,
};
use super::{
discovery::{DataWriterPlCdr, NormalDiscoveryPermission},
discovery_db::{discovery_db_read, discovery_db_write, DiscoveryDB},
spdp_participant_data, DiscoveredReaderData, DiscoveredTopicData, DiscoveredWriterData,
ParticipantMessageData, Participant_GUID, SpdpDiscoveredParticipantData,
};
#[derive(Clone, Copy, PartialEq, Debug)]
pub enum AuthenticationStatus {
Authenticated,
Authenticating, Unauthenticated,
Rejected, }
const STORED_AUTH_MESSAGE_MAX_RESEND_COUNT: u8 = 10;
struct StoredAuthenticationMessage {
message: ParticipantStatelessMessage,
remaining_resend_counter: u8,
}
impl StoredAuthenticationMessage {
pub fn new(message: ParticipantStatelessMessage) -> Self {
Self {
message,
remaining_resend_counter: STORED_AUTH_MESSAGE_MAX_RESEND_COUNT,
}
}
}
pub(crate) struct SecureDiscovery {
pub security_plugins: SecurityPluginsHandle,
pub domain_id: u16,
pub local_participant_guid: GUID,
pub local_dp_identity_token: IdentityToken,
pub local_dp_permissions_token: PermissionsToken,
pub local_dp_property_qos: qos::policy::Property,
pub local_dp_sec_attributes: ParticipantSecurityAttributes,
generic_message_helper: ParticipantGenericMessageHelper,
handshake_states: HashMap<GuidPrefix, DiscHandshakeState>,
stored_authentication_messages: HashMap<GuidPrefix, StoredAuthenticationMessage>,
cached_key_exchange_messages_for_resend: HashSet<ParticipantVolatileMessageSecure>,
cached_received_key_exchange_messages: HashMap<(GUID, GUID), ParticipantVolatileMessageSecure>,
user_data_endpoints_with_keys_already_sent_to: HashSet<GUID>,
relay_only_remote_readers: HashSet<GUID>,
}
impl SecureDiscovery {
pub fn new(
domain_participant: &DomainParticipantWeak,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
security_plugins: SecurityPluginsHandle,
) -> SecurityResult<Self> {
let mut plugins = security_plugins.lock().unwrap();
let participant_guid_prefix = domain_participant.guid().prefix;
let property_qos = domain_participant
.qos()
.property()
.expect("No property QoS defined even though security is enabled");
let identity_token = plugins
.get_identity_token(participant_guid_prefix)
.map_err(|e| create_security_error_and_log!("Failed to get IdentityToken: {}", e))?;
let _identity_status_token = plugins
.get_identity_status_token(participant_guid_prefix)
.map_err(|e| create_security_error_and_log!("Failed to get IdentityStatusToken: {}", e))?;
let permissions_token = plugins
.get_permissions_token(participant_guid_prefix)
.map_err(|e| create_security_error_and_log!("Failed to get PermissionsToken: {}", e))?;
let credential_token = plugins
.get_permissions_credential_token(participant_guid_prefix)
.map_err(|e| {
create_security_error_and_log!("Failed to get PermissionsCredentialToken: {}", e)
})?;
plugins
.set_permissions_credential_and_token(
participant_guid_prefix,
credential_token,
permissions_token.clone(),
)
.map_err(|e| create_security_error_and_log!("Failed to set permission tokens: {}", e))?;
let security_attributes = plugins
.get_participant_sec_attributes(participant_guid_prefix)
.map_err(|e| {
create_security_error_and_log!("Failed to get ParticipantSecurityAttributes: {}", e)
})?;
drop(plugins);
register_remote_to_crypto(
participant_guid_prefix,
participant_guid_prefix,
&security_plugins,
)
.map_err(|e| {
create_security_error_and_log!(
"Failed to register local participant as remote to crypto plugin: {}",
e
)
})?;
info!("Registered local participant as remote to crypto plugin");
let mut plugins = security_plugins.get_plugins();
plugins
.create_local_participant_crypto_tokens(participant_guid_prefix)
.and_then(|tokens| {
plugins.set_remote_participant_crypto_tokens(participant_guid_prefix, tokens)
})
.map_err(|e| {
create_security_error_and_log!(
"Failed to set local participant crypto tokens as remote tokens: {}",
e
)
})?;
for (writer_eid, reader_eid, _reader_endpoint, _reader_qos) in SECURE_BUILTIN_READERS_INIT_LIST
{
if *writer_eid != EntityId::P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER {
let local_writer_guid = GUID::new(participant_guid_prefix, *writer_eid);
let local_reader_guid = GUID::new(participant_guid_prefix, *reader_eid);
plugins
.create_local_writer_crypto_tokens(local_writer_guid, local_reader_guid)
.and_then(|tokens| {
plugins.set_remote_writer_crypto_tokens(local_writer_guid, local_reader_guid, tokens)
})
.map_err(|e| {
create_security_error_and_log!(
"Failed to set local writer {:?} crypto tokens as remote tokens: {}.",
writer_eid,
e
)
})?;
plugins
.create_local_reader_crypto_tokens(local_reader_guid, local_writer_guid)
.and_then(|tokens| {
plugins.set_remote_reader_crypto_tokens(local_reader_guid, local_writer_guid, tokens)
})
.map_err(|e| {
create_security_error_and_log!(
"Failed to set local reader {:?} crypto tokens as remote tokens: {}.",
reader_eid,
e
)
})?;
}
}
info!("Completed setting local crypto tokens as remote tokens");
discovery_db_write(discovery_db)
.update_authentication_status(participant_guid_prefix, AuthenticationStatus::Authenticated);
drop(plugins);
Ok(Self {
security_plugins,
domain_id: domain_participant.domain_id(),
local_participant_guid: domain_participant.guid(),
local_dp_identity_token: identity_token,
local_dp_permissions_token: permissions_token,
local_dp_property_qos: property_qos,
local_dp_sec_attributes: security_attributes,
generic_message_helper: ParticipantGenericMessageHelper::new(),
handshake_states: HashMap::new(),
cached_key_exchange_messages_for_resend: HashSet::new(),
stored_authentication_messages: HashMap::new(),
cached_received_key_exchange_messages: HashMap::new(),
user_data_endpoints_with_keys_already_sent_to: HashSet::new(),
relay_only_remote_readers: HashSet::new(),
})
}
pub fn participant_read(
&mut self,
ds: &DataSample<SpdpDiscoveredParticipantData>,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
discovery_updated_sender: &mio_channel::SyncSender<DiscoveryNotificationType>,
auth_msg_writer: &no_key::DataWriter<ParticipantStatelessMessage>,
) -> NormalDiscoveryPermission {
match &ds.value {
Sample::Value(participant_data) => self.participant_data_read(
participant_data,
discovery_db,
discovery_updated_sender,
auth_msg_writer,
),
Sample::Dispose(participant_guid) => {
self.participant_dispose_read(participant_guid, discovery_db)
}
}
}
fn participant_data_read(
&mut self,
participant_data: &SpdpDiscoveredParticipantData,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
discovery_updated_sender: &mio_channel::SyncSender<DiscoveryNotificationType>,
auth_msg_writer: &no_key::DataWriter<ParticipantStatelessMessage>,
) -> NormalDiscoveryPermission {
let guid_prefix = participant_data.participant_guid.prefix;
let auth_status_opt = discovery_db_read(discovery_db).get_authentication_status(guid_prefix);
let updated_auth_status = match auth_status_opt {
None => {
let compatible = self.check_compatibility_with_remote_participant(participant_data);
if compatible {
self.start_authentication_with_remote(
participant_data,
discovery_db,
discovery_updated_sender,
auth_msg_writer,
)
} else {
if self
.local_dp_sec_attributes
.allow_unauthenticated_participants
{
security_info!(
"Remote participant has incompatible Security, but matching with it anyways, since \
configuration allows this. Remote guid: {:?}",
participant_data.participant_guid
);
AuthenticationStatus::Unauthenticated
} else {
security_info!(
"Remote participant has incompatible Security, not matching with it. Remote guid: \
{:?}",
participant_data.participant_guid
);
AuthenticationStatus::Rejected
}
}
}
Some(AuthenticationStatus::Authenticating) => {
if let Some(DiscHandshakeState::PendingRequestSend) = self.get_handshake_state(&guid_prefix)
{
self.try_sending_new_handshake_request_message(
guid_prefix,
discovery_db,
auth_msg_writer,
);
}
info!("Authenticating with Participant {guid_prefix:?}");
AuthenticationStatus::Authenticating
}
Some(other_status) => {
other_status
}
};
discovery_db_write(discovery_db).update_authentication_status(guid_prefix, updated_auth_status);
if updated_auth_status == AuthenticationStatus::Unauthenticated {
NormalDiscoveryPermission::Allow
} else {
NormalDiscoveryPermission::Deny
}
}
fn participant_dispose_read(
&self,
participant_guid: &Participant_GUID,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
) -> NormalDiscoveryPermission {
let guid_prefix = participant_guid.0.prefix;
let db = discovery_db_read(discovery_db);
match db.get_authentication_status(guid_prefix) {
None => {
NormalDiscoveryPermission::Allow
}
Some(AuthenticationStatus::Unauthenticated) => {
NormalDiscoveryPermission::Allow
}
Some(other_status) => {
debug!(
"Received a dispose message from participant with authentication status: \
{other_status:?}. Ignoring. Participant guid prefix: {guid_prefix:?}"
);
NormalDiscoveryPermission::Deny
}
}
}
pub fn check_nonsecure_subscription_read(
&mut self,
sample: &with_key::Sample<DiscoveredReaderData, GUID>,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
) -> NormalDiscoveryPermission {
let (topic_name, participant_guidp) = match sample {
Sample::Value(reader_data) => (
reader_data.subscription_topic_data.topic_name().clone(),
reader_data.reader_proxy.remote_reader_guid.prefix,
),
Sample::Dispose(reader_guid) => {
if let Some(reader) = discovery_db_read(discovery_db).get_topic_reader(reader_guid) {
(
reader.subscription_topic_data.topic_name().clone(),
reader_guid.prefix,
)
} else {
return NormalDiscoveryPermission::Deny;
}
}
};
let topic_sec_attributes = match self
.security_plugins
.get_plugins()
.get_topic_sec_attributes(participant_guidp, &topic_name)
{
Ok(attr) => attr,
Err(e) => {
create_security_error_and_log!(
"Failed to get topic security attributes: {}. Topic: {topic_name}",
e
);
return NormalDiscoveryPermission::Deny;
}
};
if topic_sec_attributes.is_discovery_protected {
security_info!(
"Received a non-secure DCPSSubscription message for topic {topic_name} whose discovery is \
protected. Ignoring message. Participant: {:?}",
participant_guidp
);
return NormalDiscoveryPermission::Deny;
}
let auth_status = discovery_db_read(discovery_db).get_authentication_status(participant_guidp);
match sample {
Sample::Value(reader_data) => {
match auth_status {
Some(AuthenticationStatus::Unauthenticated) => {
if topic_sec_attributes.is_read_protected {
security_info!(
"Unauthenticated participant {:?} attempted to read protected topic {topic_name}. \
Rejecting.",
participant_guidp
);
NormalDiscoveryPermission::Deny
} else {
security_info!(
"Unauthenticated participant {:?} wants to read unprotected topic {topic_name}. \
Allowing.",
participant_guidp
);
NormalDiscoveryPermission::Allow
}
}
Some(AuthenticationStatus::Authenticated) => {
if topic_sec_attributes.is_read_protected {
match self
.security_plugins
.get_plugins()
.check_remote_datareader_from_nonsecure(
participant_guidp,
self.domain_id,
reader_data,
) {
Ok((check_passed, relay_only)) => {
if check_passed {
security_info!(
"Access control check passed for authenticated participant {:?} to read \
topic {topic_name}.",
participant_guidp
);
if relay_only {
self
.relay_only_remote_readers
.insert(reader_data.reader_proxy.remote_reader_guid);
}
NormalDiscoveryPermission::Allow
} else {
security_info!(
"Access control check did not pass for authenticated participant {:?} to \
read topic {topic_name}. Rejecting.",
participant_guidp
);
NormalDiscoveryPermission::Deny
}
}
Err(e) => {
create_security_error_and_log!(
"Something went wrong in checking permissions of a remote datareader: {}. \
Topic: {topic_name}",
e
);
NormalDiscoveryPermission::Deny
}
}
} else {
security_info!(
"Authenticated participant {:?} wants to read unprotected topic {topic_name}. \
Allowing.",
participant_guidp
);
NormalDiscoveryPermission::Allow
}
}
other => {
security_info!(
"Received a DCPSSubscription message from a participant with authentication status: \
{:?}. Ignoring message. Participant: {:?}",
other,
participant_guidp
);
NormalDiscoveryPermission::Deny
}
}
}
Sample::Dispose(_reader_guid) => {
match auth_status {
Some(AuthenticationStatus::Unauthenticated)
| Some(AuthenticationStatus::Authenticated) => {
security_info!(
"Participant {:?} with authentication status {:?} disposes its reader in topic \
{topic_name}.",
participant_guidp,
auth_status,
);
NormalDiscoveryPermission::Allow
}
other_status => {
security_info!(
"Participant {:?} with authentication status {:?} attempts to disposes its reader \
in topic {topic_name}. Rejecting.",
other_status,
participant_guidp
);
NormalDiscoveryPermission::Deny
}
}
}
}
}
pub fn check_nonsecure_publication_read(
&mut self,
sample: &Sample<DiscoveredWriterData, GUID>,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
) -> NormalDiscoveryPermission {
let (topic_name, participant_guidp) = match sample {
Sample::Value(writer_data) => (
writer_data.publication_topic_data.topic_name().clone(),
writer_data.writer_proxy.remote_writer_guid.prefix,
),
Sample::Dispose(writer_guid) => {
if let Some(writer) = discovery_db_read(discovery_db).get_topic_writer(writer_guid) {
(
writer.publication_topic_data.topic_name().clone(),
writer_guid.prefix,
)
} else {
return NormalDiscoveryPermission::Deny;
}
}
};
let topic_sec_attributes = match self
.security_plugins
.get_plugins()
.get_topic_sec_attributes(participant_guidp, &topic_name)
{
Ok(attr) => attr,
Err(e) => {
create_security_error_and_log!(
"Failed to get topic security attributes: {}. Topic: {topic_name}",
e
);
return NormalDiscoveryPermission::Deny;
}
};
if topic_sec_attributes.is_discovery_protected {
security_info!(
"Received a non-secure DCPSPublication message for topic {topic_name} whose discovery is \
protected. Ignoring message. Participant: {:?}",
participant_guidp
);
return NormalDiscoveryPermission::Deny;
}
let auth_status = discovery_db_read(discovery_db).get_authentication_status(participant_guidp);
match sample {
Sample::Value(writer_data) => {
match auth_status {
Some(AuthenticationStatus::Unauthenticated) => {
if topic_sec_attributes.is_write_protected {
security_info!(
"Unauthenticated participant {:?} attempted to publish to protected topic \
{topic_name}. Rejecting.",
participant_guidp
);
NormalDiscoveryPermission::Deny
} else {
security_info!(
"Unauthenticated participant {:?} wants to publish to unprotected topic \
{topic_name}. Allowing.",
participant_guidp
);
NormalDiscoveryPermission::Allow
}
}
Some(AuthenticationStatus::Authenticated) => {
if topic_sec_attributes.is_write_protected {
match self
.security_plugins
.get_plugins()
.check_remote_datawriter_from_nonsecure(
participant_guidp,
self.domain_id,
writer_data,
) {
Ok(check_passed) => {
if check_passed {
security_info!(
"Access control check passed for authenticated participant {:?} to publish \
to topic {topic_name}.",
participant_guidp
);
NormalDiscoveryPermission::Allow
} else {
security_info!(
"Access control check did not pass for authenticated participant {:?} to \
publish to topic {topic_name}. Rejecting.",
participant_guidp
);
NormalDiscoveryPermission::Deny
}
}
Err(e) => {
create_security_error_and_log!(
"Something went wrong in checking permissions of a remote DataWriter: {}. \
Topic: {topic_name}",
e
);
NormalDiscoveryPermission::Deny
}
}
} else {
security_info!(
"Authenticated participant {:?} wants to publish to unprotected topic \
{topic_name}. Allowing.",
participant_guidp
);
NormalDiscoveryPermission::Allow
}
}
other => {
security_info!(
"Received a DCPSPublication message from a participant with authentication status: \
{:?}. Ignoring message. Participant: {:?}",
other,
participant_guidp
);
NormalDiscoveryPermission::Deny
}
}
}
Sample::Dispose(_writer_guid) => {
match auth_status {
Some(AuthenticationStatus::Unauthenticated)
| Some(AuthenticationStatus::Authenticated) => {
security_info!(
"Participant {:?} with authentication status {:?} disposes its writer in topic \
{topic_name}.",
participant_guidp,
auth_status,
);
NormalDiscoveryPermission::Allow
}
other_status => {
security_info!(
"Participant {:?} with authentication status {:?} attempts to disposes its writer \
in topic {topic_name}. Rejecting.",
other_status,
participant_guidp
);
NormalDiscoveryPermission::Deny
}
}
}
}
}
pub fn check_topic_read(
&mut self,
sample: &with_key::Sample<(DiscoveredTopicData, GUID), GUID>,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
) -> NormalDiscoveryPermission {
let participant_guidp = match sample {
Sample::Value((_topic_data, guid)) => guid.prefix,
Sample::Dispose(guid) => guid.prefix,
};
let auth_status = discovery_db_read(discovery_db).get_authentication_status(participant_guidp);
if auth_status != Some(AuthenticationStatus::Authenticated) {
security_warn!(
"DCPSTopic data from non-authenticated participant {:?}",
participant_guidp
);
return NormalDiscoveryPermission::Deny;
}
match sample {
Sample::Value((disc_topic, _guid)) => {
match self.security_plugins.get_plugins().check_remote_topic(
participant_guidp,
self.domain_id,
&disc_topic.topic_data,
) {
Ok(check_passed) => {
if check_passed {
security_info!(
"Access control check passed for participant {:?} to create topic {}.",
participant_guidp,
disc_topic.topic_data.name
);
NormalDiscoveryPermission::Allow
} else {
security_info!(
"Access control check did not pass for participant {:?} to create topic {}.",
participant_guidp,
disc_topic.topic_data.name
);
NormalDiscoveryPermission::Deny
}
}
Err(e) => {
create_security_error_and_log!(
"Something went wrong in checking remote permissions for topic {}: {:?}",
disc_topic.topic_data.name,
e
);
NormalDiscoveryPermission::Deny
}
}
}
Sample::Dispose(_guid) => {
NormalDiscoveryPermission::Allow
}
}
}
pub fn secure_participant_read(
&mut self,
ds: &with_key::Sample<
ParticipantBuiltinTopicDataSecure,
spdp_participant_data::Participant_GUID,
>,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
_discovery_updated_sender: &mio_channel::SyncSender<DiscoveryNotificationType>,
) -> NormalDiscoveryPermission {
let guidp = match ds {
with_key::Sample::Value(data) => data.participant_data.participant_guid.prefix,
with_key::Sample::Dispose(pguid) => pguid.0.prefix,
};
let auth_status = discovery_db_read(discovery_db).get_authentication_status(guidp);
if auth_status != Some(AuthenticationStatus::Authenticated) {
security_warn!(
"Received a DCPSParticipantsSecure message from a non-authenticated participant. Auth \
status: {:?}",
auth_status
);
return NormalDiscoveryPermission::Deny;
}
match ds {
with_key::Sample::Value(_data) => {
NormalDiscoveryPermission::Allow
}
with_key::Sample::Dispose(_pguid) => {
NormalDiscoveryPermission::Allow
}
}
}
pub fn secure_spdp_publish(
&self,
secure_participant_writer: &DataWriterPlCdr<ParticipantBuiltinTopicDataSecure>,
participant_data: SpdpDiscoveredParticipantData,
) {
let participant_secure_data = ParticipantBuiltinTopicDataSecure {
participant_data,
identity_status_token_opt: None, };
if let Err(e) = secure_participant_writer.write(participant_secure_data, None) {
error!("Publishing to ParticipantBuiltinTopicDataSecure failed: {e:?}");
}
}
pub fn sedp_publish_single_reader(
&self,
nonsecure_sub_writer: &DataWriterPlCdr<DiscoveredReaderData>,
secure_sub_writer: &DataWriterPlCdr<SubscriptionBuiltinTopicDataSecure>,
local_user_reader: &DiscoveredReaderData,
) {
let do_secure_write =
if let Some(sec_info) = local_user_reader.subscription_topic_data.security_info() {
let sec_attributes = EndpointSecurityAttributes::from(sec_info.clone());
sec_attributes
.topic_security_attributes
.is_discovery_protected
} else {
false
};
if do_secure_write {
let sec_sub_data = SubscriptionBuiltinTopicDataSecure::from((*local_user_reader).clone());
if let Err(e) = secure_sub_writer.write(sec_sub_data, None) {
error!("Failed to write subscription to DCPSSubscriptionsSecure: {e}");
} else {
security_info!(
"Published DCPSSubscriptionsSecure data on topic {}, reader guid {:?}",
local_user_reader.subscription_topic_data.topic_name(),
local_user_reader.reader_proxy.remote_reader_guid
);
}
} else {
if let Err(e) = nonsecure_sub_writer.write((*local_user_reader).clone(), None) {
error!("Failed to write subscription to DCPSSubscriptions: {e}");
} else {
debug!(
"Published DCPSSubscriptions data on topic {}, reader guid {:?}",
local_user_reader.subscription_topic_data.topic_name(),
local_user_reader.reader_proxy.remote_reader_guid
);
}
}
}
pub fn sedp_publish_single_writer(
&self,
nonsecure_pub_writer: &DataWriterPlCdr<DiscoveredWriterData>,
secure_pub_writer: &DataWriterPlCdr<PublicationBuiltinTopicDataSecure>,
local_user_writer: &DiscoveredWriterData,
) {
let do_secure_write =
if let Some(sec_info) = local_user_writer.publication_topic_data.security_info() {
let sec_attributes = EndpointSecurityAttributes::from(sec_info.clone());
sec_attributes
.topic_security_attributes
.is_discovery_protected
} else {
false
};
if do_secure_write {
let sec_pub_data = PublicationBuiltinTopicDataSecure::from((*local_user_writer).clone());
if let Err(e) = secure_pub_writer.write(sec_pub_data, None) {
error!("Failed to write publication to DCPSPublicationsSecure: {e}");
} else {
security_info!(
"Published DCPSPublicationsSecure data on topic {}, writer guid {:?}",
local_user_writer.publication_topic_data.topic_name(),
local_user_writer.writer_proxy.remote_writer_guid
);
}
} else {
if let Err(e) = nonsecure_pub_writer.write((*local_user_writer).clone(), None) {
error!("Failed to write publication to DCPSPublications: {e}");
} else {
debug!(
"Published DCPSPublication data on topic {}, writer guid {:?}",
local_user_writer.publication_topic_data.topic_name(),
local_user_writer.writer_proxy.remote_writer_guid
);
}
}
}
pub fn write_liveness_message(
&self,
secure_writer: &DataWriterCdr<ParticipantMessageData>,
nonsecure_writer: &DataWriterCdr<ParticipantMessageData>,
msg: ParticipantMessageData,
) -> Result<(), WriteError<ParticipantMessageData>> {
if self.local_dp_sec_attributes.is_liveliness_protected {
secure_writer.write(msg, None)
} else {
nonsecure_writer.write(msg, None)
}
}
pub fn check_secure_subscription_read(
&mut self,
sample: &with_key::Sample<SubscriptionBuiltinTopicDataSecure, GUID>,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
) -> NormalDiscoveryPermission {
let (topic_name, participant_guidp) = match sample {
Sample::Value(sub_data) => (
sub_data
.discovered_reader_data
.subscription_topic_data
.topic_name()
.clone(),
sub_data
.discovered_reader_data
.reader_proxy
.remote_reader_guid
.prefix,
),
Sample::Dispose(reader_guid) => {
if let Some(reader) = discovery_db_read(discovery_db).get_topic_reader(reader_guid) {
(
reader.subscription_topic_data.topic_name().clone(),
reader_guid.prefix,
)
} else {
return NormalDiscoveryPermission::Deny;
}
}
};
let auth_status = discovery_db_read(discovery_db).get_authentication_status(participant_guidp);
if auth_status != Some(AuthenticationStatus::Authenticated) {
security_warn!(
"DCPSSubscriptionsSecure data from non-authenticated participant {:?}",
participant_guidp
);
return NormalDiscoveryPermission::Deny;
}
let topic_sec_attributes = match self
.security_plugins
.get_plugins()
.get_topic_sec_attributes(participant_guidp, &topic_name)
{
Ok(attr) => attr,
Err(e) => {
create_security_error_and_log!(
"Failed to get topic security attributes: {}. Topic: {topic_name}",
e
);
return NormalDiscoveryPermission::Deny;
}
};
match sample {
Sample::Value(sub_data) => {
if topic_sec_attributes.is_read_protected {
match self
.security_plugins
.get_plugins()
.check_remote_datareader_from_secure(participant_guidp, self.domain_id, sub_data)
{
Ok((check_passed, relay_only)) => {
if check_passed {
security_info!(
"Access control check passed for authenticated participant {:?} to read topic \
{topic_name}.",
participant_guidp
);
if relay_only {
self.relay_only_remote_readers.insert(
sub_data
.discovered_reader_data
.reader_proxy
.remote_reader_guid,
);
}
NormalDiscoveryPermission::Allow
} else {
security_info!(
"Access control check did not pass for authenticated participant {:?} to read \
topic {topic_name}. Rejecting.",
participant_guidp
);
NormalDiscoveryPermission::Deny
}
}
Err(e) => {
create_security_error_and_log!(
"Something went wrong in checking permissions of a remote datareader: {}. Topic: \
{topic_name}",
e
);
NormalDiscoveryPermission::Deny
}
}
} else {
security_info!(
"Authenticated participant {:?} wants to read unprotected topic {topic_name}. \
Allowing.",
participant_guidp
);
NormalDiscoveryPermission::Allow
}
}
Sample::Dispose(_reader_guid) => {
NormalDiscoveryPermission::Allow
}
}
}
pub fn check_secure_publication_read(
&mut self,
sample: &with_key::Sample<PublicationBuiltinTopicDataSecure, GUID>,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
) -> NormalDiscoveryPermission {
let (topic_name, participant_guidp) = match sample {
Sample::Value(pub_data) => (
pub_data
.discovered_writer_data
.publication_topic_data
.topic_name()
.clone(),
pub_data
.discovered_writer_data
.writer_proxy
.remote_writer_guid
.prefix,
),
Sample::Dispose(writer_guid) => {
if let Some(writer) = discovery_db_read(discovery_db).get_topic_writer(writer_guid) {
(
writer.publication_topic_data.topic_name().clone(),
writer_guid.prefix,
)
} else {
return NormalDiscoveryPermission::Deny;
}
}
};
let auth_status = discovery_db_read(discovery_db).get_authentication_status(participant_guidp);
if auth_status != Some(AuthenticationStatus::Authenticated) {
security_warn!(
"DCPSPublicationsSecure data from non-authenticated participant {:?}",
participant_guidp
);
return NormalDiscoveryPermission::Deny;
}
let topic_sec_attributes = match self
.security_plugins
.get_plugins()
.get_topic_sec_attributes(participant_guidp, &topic_name)
{
Ok(attr) => attr,
Err(e) => {
create_security_error_and_log!(
"Failed to get topic security attributes: {}. Topic: {topic_name}",
e
);
return NormalDiscoveryPermission::Deny;
}
};
match sample {
Sample::Value(pub_data) => {
if topic_sec_attributes.is_write_protected {
match self
.security_plugins
.get_plugins()
.check_remote_datawriter_from_secure(participant_guidp, self.domain_id, pub_data)
{
Ok(check_passed) => {
if check_passed {
security_info!(
"Access control check passed for authenticated participant {:?} to publish to \
topic {topic_name}.",
participant_guidp
);
NormalDiscoveryPermission::Allow
} else {
security_info!(
"Access control check did not pass for authenticated participant {:?} to \
publish to topic {topic_name}. Rejecting.",
participant_guidp
);
NormalDiscoveryPermission::Deny
}
}
Err(e) => {
create_security_error_and_log!(
"Something went wrong in checking permissions of a remote DataWriter: {}. Topic: \
{topic_name}",
e
);
NormalDiscoveryPermission::Deny
}
}
} else {
security_info!(
"Authenticated participant {:?} wants to publish to unprotected topic {topic_name}. \
Allowing.",
participant_guidp
);
NormalDiscoveryPermission::Allow
}
}
Sample::Dispose(_writer_guid) => {
NormalDiscoveryPermission::Allow
}
}
}
fn check_compatibility_with_remote_participant(
&self,
remote_data: &SpdpDiscoveredParticipantData,
) -> bool {
if let Some(token) = remote_data.identity_token.as_ref() {
let my_class_id = self.local_dp_identity_token.class_id();
let remote_class_id = token.class_id();
if my_class_id != remote_class_id {
info!(
"Participants not compatible because of different IdentityToken class IDs. Local \
id:{my_class_id}, remote id: {remote_class_id}"
);
return false;
}
} else {
info!("Participants not compatible because remote does not have IdentityToken");
return false;
}
if let Some(token) = remote_data.permissions_token.as_ref() {
let my_class_id = self.local_dp_permissions_token.class_id();
let remote_class_id = token.class_id();
if my_class_id != remote_class_id {
info!(
"Participants not compatible because of different PermissionsToken class IDs. Local \
id:{my_class_id}, remote id: {remote_class_id}"
);
return false;
}
} else {
info!("Participants not compatible because remote does not have PermissionsToken");
return false;
}
if let Some(remote_sec_info) = remote_data.security_info.as_ref() {
let my_sec_info = ParticipantSecurityInfo::from(self.local_dp_sec_attributes.clone());
let my_mask = my_sec_info.participant_security_attributes;
let remote_mask = remote_sec_info.participant_security_attributes;
let my_plugin_mask = my_sec_info.plugin_participant_security_attributes;
let remote_plugin_mask = remote_sec_info.plugin_participant_security_attributes;
if my_mask.is_valid()
&& remote_mask.is_valid()
&& my_plugin_mask.is_valid()
&& remote_plugin_mask.is_valid()
{
if my_sec_info != *remote_sec_info {
info!("Participants not compatible because of unequal ParticipantSecurityInfos");
return false;
}
} else {
info!(
"Participants not compatible because some ParticipantSecurityInfo masks are not valid"
);
return false;
}
} else {
info!("Participants not compatible because remote does not have ParticipantSecurityInfo");
return false;
}
true
}
fn start_authentication_with_remote(
&mut self,
participant_data: &SpdpDiscoveredParticipantData,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
discovery_updated_sender: &mio_channel::SyncSender<DiscoveryNotificationType>,
auth_msg_writer: &no_key::DataWriter<ParticipantStatelessMessage>,
) -> AuthenticationStatus {
let my_guid = self.local_participant_guid;
let remote_guid = participant_data.participant_guid;
let remote_identity_token = match participant_data.identity_token.as_ref() {
Some(token) => token.clone(),
None => {
create_security_error_and_log!(
"SpdpDiscoveredParticipantData is missing the Identity token"
);
return AuthenticationStatus::Rejected;
}
};
let outcome: ValidationOutcome = match self
.security_plugins
.get_plugins()
.validate_remote_identity(
my_guid.prefix,
remote_identity_token,
remote_guid.prefix,
None,
) {
Ok(res) => {
res.0
}
Err(e) => {
security_info!(
"Failed to validate the identity of the remote participant {:?}. Info: {}",
remote_guid.prefix,
e.msg
);
if self
.local_dp_sec_attributes
.allow_unauthenticated_participants
{
security_info!(
"Treating the remote participant {:?} as Unauthenticated, since configuration allows \
this.",
remote_guid.prefix,
);
return AuthenticationStatus::Unauthenticated;
} else {
return AuthenticationStatus::Rejected;
}
}
};
info!(
"Validated identity of remote participant {:?}",
remote_guid.prefix
);
discovery_db_write(discovery_db).update_participant(participant_data);
self.update_participant_authentication_status_and_notify_dp(
remote_guid.prefix,
AuthenticationStatus::Authenticating,
discovery_db,
discovery_updated_sender,
);
match outcome {
ValidationOutcome::PendingHandshakeRequest => {
self.update_handshake_state(remote_guid.prefix, DiscHandshakeState::PendingRequestSend);
self.try_sending_new_handshake_request_message(
remote_guid.prefix,
discovery_db,
auth_msg_writer,
);
AuthenticationStatus::Authenticating }
ValidationOutcome::PendingHandshakeMessage => {
self.update_handshake_state(
remote_guid.prefix,
DiscHandshakeState::PendingRequestMessage,
);
debug!(
"Waiting for a handshake request from remote participant {:?}",
remote_guid.prefix
);
AuthenticationStatus::Authenticating }
outcome => {
error!(
"Got an unexpected outcome when validating remote identity. Validation outcome: \
{outcome:?}. Remote guid: {remote_guid:?}"
);
AuthenticationStatus::Rejected }
}
}
fn update_participant_authentication_status_and_notify_dp(
&mut self,
participant_guid_prefix: GuidPrefix,
new_status: AuthenticationStatus,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
discovery_updated_sender: &mio_channel::SyncSender<DiscoveryNotificationType>,
) {
let mut db = discovery_db_write(discovery_db);
db.update_authentication_status(participant_guid_prefix, new_status);
send_discovery_notification(
discovery_updated_sender,
DiscoveryNotificationType::ParticipantAuthenticationStatusChanged {
guid_prefix: participant_guid_prefix,
},
);
}
fn create_handshake_request_message(
&mut self,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
remote_guid_prefix: GuidPrefix,
) -> SecurityResult<ParticipantStatelessMessage> {
let my_ser_data = self.get_serialized_local_participant_data(discovery_db)?;
let (validation_outcome, request_token) = self
.security_plugins
.get_plugins()
.begin_handshake_request(
self.local_participant_guid.prefix,
remote_guid_prefix,
my_ser_data,
)?;
if validation_outcome != ValidationOutcome::PendingHandshakeMessage {
return Err(create_security_error_and_log!(
"Received an unexpected validation outcome from begin_handshake_request. Outcome: {:?}",
validation_outcome
));
}
let request_message = self.new_stateless_message(
GMCLASSID_SECURITY_AUTH_HANDSHAKE,
remote_guid_prefix,
None,
request_token,
);
Ok(request_message)
}
fn try_sending_new_handshake_request_message(
&mut self,
remote_guid_prefix: GuidPrefix,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
auth_msg_writer: &no_key::DataWriter<ParticipantStatelessMessage>,
) {
debug!("Sending a handshake request message to remote participant {remote_guid_prefix:?}");
let request_message =
match self.create_handshake_request_message(discovery_db, remote_guid_prefix) {
Ok(message) => message,
Err(e) => {
error!(
"Failed to create a handshake request message. Reason: {}. Remote guid prefix: {:?}. \
Trying again later.",
e.msg, remote_guid_prefix
);
return;
}
};
self.stored_authentication_messages.insert(
remote_guid_prefix,
StoredAuthenticationMessage::new(request_message.clone()),
);
let _ = auth_msg_writer.write(request_message, None).map_err(|err| {
warn!(
"Failed to send a handshake request message. Remote GUID prefix: {remote_guid_prefix:?}. \
Info: {err}. Trying to resend the message later."
);
});
self.update_handshake_state(remote_guid_prefix, DiscHandshakeState::PendingReplyMessage);
}
pub fn resend_cached_secure_discovery_messages(
&mut self,
auth_msg_writer: &no_key::DataWriter<ParticipantStatelessMessage>,
key_exchange_writer: &no_key::DataWriter<ParticipantVolatileMessageSecure>,
) {
for (guid_prefix, stored_message) in self.stored_authentication_messages.iter_mut() {
if self.handshake_states.get(guid_prefix)
!= Some(&DiscHandshakeState::CompletedWithFinalMessageSent)
{
match auth_msg_writer.write(stored_message.message.clone(), None) {
Ok(()) => {
stored_message.remaining_resend_counter -= 1;
debug!(
"Resent an unanswered authentication message to remote participant {:?}. Resending \
at most {} more times.",
guid_prefix, stored_message.remaining_resend_counter,
);
}
Err(err) => {
debug!(
"Failed to resend an unanswered authentication message to remote participant \
{guid_prefix:?}. Error: {err}. Retrying later."
);
}
}
}
}
self
.stored_authentication_messages
.retain(|_guid_prefix, message| message.remaining_resend_counter > 0);
let mut msgs_still_to_cache = HashSet::new();
for msg in self.cached_key_exchange_messages_for_resend.iter() {
match self.send_key_exchange_message(key_exchange_writer, msg) {
Ok(()) => {
debug!(
"Successfully sent a cached key exchange message to {:?}",
msg.generic.destination_participant_guid
);
}
Err(e) => {
create_security_error_and_log!(
"Failed again to send some crypto keys to {:?}: {e}",
msg.generic.destination_participant_guid
);
msgs_still_to_cache.insert(msg.clone());
}
}
}
self.cached_key_exchange_messages_for_resend = msgs_still_to_cache;
}
fn reset_stored_message_resend_counter(&mut self, remote_guid_prefix: &GuidPrefix) {
if let Some(msg) = self
.stored_authentication_messages
.get_mut(remote_guid_prefix)
{
msg.remaining_resend_counter = STORED_AUTH_MESSAGE_MAX_RESEND_COUNT;
} else {
debug!(
"Did not find a stored authentication message for remote participant \
{remote_guid_prefix:?}"
);
}
}
pub fn participant_stateless_message_read(
&mut self,
message: &ParticipantStatelessMessage,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
discovery_updated_sender: &mio_channel::SyncSender<DiscoveryNotificationType>,
auth_msg_writer: &no_key::DataWriter<ParticipantStatelessMessage>,
) {
if !self.is_stateless_msg_for_local_participant(message) {
trace!("Ignoring a ParticipantStatelessMessage, since its not meant for me.");
return;
}
if message.generic.message_class_id != GMCLASSID_SECURITY_AUTH_HANDSHAKE {
debug!(
"Received a ParticipantStatelessMessage with an unknown GenericMessageClassID: {}",
message.generic.message_class_id
);
return;
}
let remote_guid_prefix = message.generic.source_guid_prefix();
match self.get_handshake_state(&remote_guid_prefix) {
None => {
trace!(
"Received a handshake message from remote participant {remote_guid_prefix:?}. Ignoring, \
since no handshake going on."
);
}
Some(DiscHandshakeState::PendingRequestSend) => {
self.try_sending_new_handshake_request_message(
remote_guid_prefix,
discovery_db,
auth_msg_writer,
);
}
Some(DiscHandshakeState::PendingRequestMessage) => {
self.handshake_on_pending_request_message(message, discovery_db, auth_msg_writer);
}
Some(DiscHandshakeState::PendingReplyMessage) => {
self.handshake_on_pending_reply_message(
message,
discovery_db,
auth_msg_writer,
discovery_updated_sender,
);
}
Some(DiscHandshakeState::PendingFinalMessage) => {
self.handshake_on_pending_final_message(message, discovery_db, discovery_updated_sender);
}
Some(DiscHandshakeState::CompletedWithFinalMessageSent) => {
debug!("Resending a final handshake message to remote participant {remote_guid_prefix:?}");
self.resend_final_handshake_message(remote_guid_prefix, auth_msg_writer);
}
Some(DiscHandshakeState::CompletedWithFinalMessageReceived) => {
debug!(
"Received a handshake message from remote participant {remote_guid_prefix:?}. Handshake \
with this participant has already been completed by receiving the final message. \
Nothing for us to do anymore."
);
}
}
}
fn handshake_on_pending_request_message(
&mut self,
received_message: &ParticipantStatelessMessage,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
auth_msg_writer: &no_key::DataWriter<ParticipantStatelessMessage>,
) {
let remote_guid_prefix = received_message.generic.source_guid_prefix();
debug!(
"Received a handshake message from remote participant {remote_guid_prefix:?}. Expecting a \
handshake request message."
);
let local_guid_prefix = self.local_participant_guid.prefix;
let handshake_token = match get_handshake_token_from_stateless_message(received_message) {
Some(token) => token,
None => {
error!(
"A ParticipantStatelessMessage does not contain a message token. Remote guid prefix: \
{remote_guid_prefix:?}"
);
return;
}
};
let my_serialized_data =
if let Ok(data) = self.get_serialized_local_participant_data(discovery_db) {
data
} else {
error!(" Could not get serialized local participant data");
return;
};
let result = self.security_plugins.get_plugins().begin_handshake_reply(
local_guid_prefix,
remote_guid_prefix,
handshake_token,
my_serialized_data,
);
match result {
Ok((ValidationOutcome::PendingHandshakeMessage, reply_token)) => {
let reply_message = self.new_stateless_message(
GMCLASSID_SECURITY_AUTH_HANDSHAKE,
remote_guid_prefix,
Some(received_message),
reply_token,
);
debug!("Sending a handshake reply message to remote participant {remote_guid_prefix:?}");
let _ = auth_msg_writer
.write(reply_message.clone(), None)
.map_err(|err| {
error!(
"Failed to send a handshake reply message. Remote GUID prefix: \
{remote_guid_prefix:?}. Info: {err}. Trying to resend the message later."
);
});
self.stored_authentication_messages.insert(
remote_guid_prefix,
StoredAuthenticationMessage::new(reply_message),
);
self.update_handshake_state(remote_guid_prefix, DiscHandshakeState::PendingFinalMessage);
}
Ok((other_outcome, _reply_token)) => {
error!(
"Unexpected validation outcome from begin_handshake_reply. Outcome: {other_outcome:?}. \
Remote guid prefix: {remote_guid_prefix:?}"
);
}
Err(e) => {
error!(
"Replying to a handshake request failed: {e}. Remote guid prefix: {remote_guid_prefix:?}"
);
}
}
}
fn handshake_on_pending_reply_message(
&mut self,
received_message: &ParticipantStatelessMessage,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
auth_msg_writer: &no_key::DataWriter<ParticipantStatelessMessage>,
discovery_updated_sender: &mio_channel::SyncSender<DiscoveryNotificationType>,
) {
let remote_guid_prefix = received_message.generic.source_guid_prefix();
debug!(
"Received a handshake message from remote participant {remote_guid_prefix:?}. Expecting a \
handshake reply message."
);
if !self.check_is_stateless_msg_related_to_our_msg(received_message, remote_guid_prefix) {
warn!(
"Received handshake message that is not related to the message that we have sent. \
Ignoring. Remote guid prefix: {remote_guid_prefix:?}"
);
return;
}
let handshake_token = match get_handshake_token_from_stateless_message(received_message) {
Some(token) => token,
None => {
error!(
"A ParticipantStatelessMessage does not contain a message token. Ignoring the message. \
Remote guid prefix: {remote_guid_prefix:?}"
);
return;
}
};
let result = self
.security_plugins
.get_plugins()
.process_handshake(remote_guid_prefix, handshake_token);
match result {
Ok((ValidationOutcome::OkFinalMessage, Some(final_message_token))) => {
let final_message = self.new_stateless_message(
GMCLASSID_SECURITY_AUTH_HANDSHAKE,
remote_guid_prefix,
Some(received_message),
final_message_token,
);
debug!("Sending a final handshake message to remote participant {remote_guid_prefix:?}");
let _ = auth_msg_writer
.write(final_message.clone(), None)
.map_err(|err| {
error!(
"Failed to send a final handshake message. Remote GUID prefix: \
{remote_guid_prefix:?}. Info: {err}. Trying to resend the message later."
);
});
self.stored_authentication_messages.insert(
remote_guid_prefix,
StoredAuthenticationMessage::new(final_message),
);
self.update_handshake_state(
remote_guid_prefix,
DiscHandshakeState::CompletedWithFinalMessageSent,
);
self.on_remote_participant_authenticated(
remote_guid_prefix,
discovery_db,
discovery_updated_sender,
);
}
Ok((other_outcome, _token_opt)) => {
error!(
"Received an unexpected validation outcome from the security plugins. Outcome: \
{other_outcome:?}. Remote guid prefix: {remote_guid_prefix:?}"
);
}
Err(e) => {
error!(
"Validating handshake reply message failed. Error: {e}. Remote guid prefix: \
{remote_guid_prefix:?}"
);
self.reset_stored_message_resend_counter(&remote_guid_prefix);
}
}
}
fn handshake_on_pending_final_message(
&mut self,
received_message: &ParticipantStatelessMessage,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
discovery_updated_sender: &mio_channel::SyncSender<DiscoveryNotificationType>,
) {
let remote_guid_prefix = received_message.generic.source_guid_prefix();
debug!(
"Received a handshake message from remote participant {remote_guid_prefix:?}. Expecting a \
final handshake message"
);
if !self.check_is_stateless_msg_related_to_our_msg(received_message, remote_guid_prefix) {
warn!(
"Received handshake message that is not related to the message that we have sent. \
Ignoring. Remote guid prefix: {remote_guid_prefix:?}"
);
return;
}
let handshake_token = match get_handshake_token_from_stateless_message(received_message) {
Some(token) => token,
None => {
error!(
"A ParticipantStatelessMessage does not contain a message token. Ignoring the message. \
Remote guid prefix: {remote_guid_prefix:?}"
);
return;
}
};
let result = self
.security_plugins
.get_plugins()
.process_handshake(remote_guid_prefix, handshake_token);
match result {
Ok((ValidationOutcome::Ok, None)) => {
self.update_handshake_state(
remote_guid_prefix,
DiscHandshakeState::CompletedWithFinalMessageReceived,
);
self
.stored_authentication_messages
.remove(&remote_guid_prefix);
self.on_remote_participant_authenticated(
remote_guid_prefix,
discovery_db,
discovery_updated_sender,
);
}
Ok((other_outcome, _token_opt)) => {
error!(
"Received an unexpected validation outcome from the security plugins. Outcome: \
{other_outcome:?}. Remote guid prefix: {remote_guid_prefix:?}"
);
}
Err(e) => {
error!(
"Validating final handshake message failed. Error: {e}. Remote guid prefix: \
{remote_guid_prefix:?}"
);
self.reset_stored_message_resend_counter(&remote_guid_prefix);
}
}
}
pub fn volatile_message_secure_read(&mut self, msg: &ParticipantVolatileMessageSecure) {
let dest_guid = msg.generic.destination_participant_guid;
let is_for_us = (dest_guid == GUID::GUID_UNKNOWN) || (dest_guid == self.local_participant_guid);
if !is_for_us {
trace!(
"Ignoring ParticipantVolatileMessageSecure message since it's not for us. dest_guid: \
{dest_guid:?}"
);
return;
}
let crypto_tokens = msg
.generic
.message_data
.iter()
.map(|dh| CryptoToken::from(dh.clone()))
.collect();
match msg.generic.message_class_id.as_str() {
GMCLASSID_SECURITY_PARTICIPANT_CRYPTO_TOKENS => {
if dest_guid != self.local_participant_guid {
debug!("Invalid destination participant guid, ignoring participant crypto tokens");
return;
}
let remote_participant_guidp = msg.generic.message_identity.writer_guid.prefix;
if let Err(e) = self
.security_plugins
.get_plugins()
.set_remote_participant_crypto_tokens(remote_participant_guidp, crypto_tokens)
{
create_security_error_and_log!(
"Failed to set remote participant crypto tokens: {}. Remote: {:?}",
e,
remote_participant_guidp
);
} else {
info!("Set crypto tokens for remote participant {remote_participant_guidp:?}");
}
}
GMCLASSID_SECURITY_DATAWRITER_CRYPTO_TOKENS => {
let set_result = self
.security_plugins
.get_plugins()
.set_remote_writer_crypto_tokens(
msg.generic.source_endpoint_guid,
msg.generic.destination_endpoint_guid,
crypto_tokens,
);
if let Err(e) = set_result {
warn!(
"Failed to set remote writer {:?} crypto tokens. Storing them & trying again later. \
Error message: {e}",
msg.generic.source_endpoint_guid
);
self.store_received_volatile_message(msg.clone());
} else {
info!(
"Set crypto tokens for remote writer {:?}",
msg.generic.source_endpoint_guid
);
}
}
GMCLASSID_SECURITY_DATAREADER_CRYPTO_TOKENS => {
let set_result = self
.security_plugins
.get_plugins()
.set_remote_reader_crypto_tokens(
msg.generic.source_endpoint_guid,
msg.generic.destination_endpoint_guid,
crypto_tokens,
);
if let Err(e) = set_result {
warn!(
"Failed to set remote reader {:?} crypto tokens. Storing them & trying again later. \
Error message: {e}",
msg.generic.source_endpoint_guid
);
self.store_received_volatile_message(msg.clone());
} else {
info!(
"Set crypto tokens for remote reader {:?}",
msg.generic.source_endpoint_guid
);
}
}
other => {
debug!("Unknown message_class_id in a volatile message: {other}");
}
}
}
fn store_received_volatile_message(&mut self, msg: ParticipantVolatileMessageSecure) {
let local_endpoint_guid = msg.generic.destination_endpoint_guid;
let remote_endpoint_guid = msg.generic.source_endpoint_guid;
self
.cached_received_key_exchange_messages
.insert((local_endpoint_guid, remote_endpoint_guid), msg);
}
fn on_remote_participant_authenticated(
&mut self,
remote_guid_prefix: GuidPrefix,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
discovery_updated_sender: &mio_channel::SyncSender<DiscoveryNotificationType>,
) {
security_info!("Authenticated successfully Participant {remote_guid_prefix:?}");
match self.validate_remote_participant_permissions(remote_guid_prefix, discovery_db) {
Ok(()) => {
debug!("Validated permissions for remote participant {remote_guid_prefix:?}");
}
Err(e) => {
security_info!(
"Validating permissions for remote failed: {}. Rejecting the remote. Guid prefix: {:?}",
e,
remote_guid_prefix
);
self.update_participant_authentication_status_and_notify_dp(
remote_guid_prefix,
AuthenticationStatus::Rejected,
discovery_db,
discovery_updated_sender,
);
return;
}
}
if self.local_dp_sec_attributes.is_access_protected {
let check_result = self
.security_plugins
.get_plugins()
.check_remote_participant(self.domain_id, remote_guid_prefix);
match check_result {
Ok(check_passed) => {
if check_passed {
security_info!(
"Allowing remote participant {:?} to join the domain.",
remote_guid_prefix
);
} else {
security_info!(
"Remote participant {:?} is not allowed to join the domain. Rejecting the remote.",
remote_guid_prefix
);
self.update_participant_authentication_status_and_notify_dp(
remote_guid_prefix,
AuthenticationStatus::Rejected,
discovery_db,
discovery_updated_sender,
);
return;
}
}
Err(e) => {
create_security_error_and_log!(
"Something went wrong in checking remote participant permissions: {}. Rejecting the \
remote {:?}.",
e,
remote_guid_prefix
);
self.update_participant_authentication_status_and_notify_dp(
remote_guid_prefix,
AuthenticationStatus::Rejected,
discovery_db,
discovery_updated_sender,
);
return;
}
}
}
if let Err(e) = register_remote_to_crypto(
self.local_participant_guid.prefix,
remote_guid_prefix,
&self.security_plugins,
) {
create_security_error_and_log!(
"Failed to register remote participant {:?} to crypto plugin: {}. Rejecting remote",
remote_guid_prefix,
e,
);
self.update_participant_authentication_status_and_notify_dp(
remote_guid_prefix,
AuthenticationStatus::Rejected,
discovery_db,
discovery_updated_sender,
);
return;
};
self.update_participant_authentication_status_and_notify_dp(
remote_guid_prefix,
AuthenticationStatus::Authenticated,
discovery_db,
discovery_updated_sender,
);
}
pub fn start_key_exchange_with_remote_participant(
&mut self,
remote_guid_prefix: GuidPrefix,
key_exchange_writer: &no_key::DataWriter<ParticipantVolatileMessageSecure>,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
) {
let remotes_builtin_endpoints =
match discovery_db_read(discovery_db).find_participant_proxy(remote_guid_prefix) {
Some(data) => data.available_builtin_endpoints,
None => {
error!("Could not find participant {remote_guid_prefix:?} from DiscoveryDB");
return;
}
};
let crypto_tokens_res = self
.security_plugins
.get_plugins()
.create_local_participant_crypto_tokens(remote_guid_prefix); match crypto_tokens_res {
Err(e) => {
create_security_error_and_log!(
"Failed to create participant crypto tokens: {e }. Remote: {remote_guid_prefix:?}"
);
}
Ok(tokens) => {
let message = self.new_volatile_message(
GMCLASSID_SECURITY_PARTICIPANT_CRYPTO_TOKENS,
key_exchange_writer.guid(),
GUID::GUID_UNKNOWN, remote_guid_prefix,
GUID::GUID_UNKNOWN, &tokens,
);
if let Err(e) = self.send_key_exchange_message(key_exchange_writer, &message) {
create_security_error_and_log!(
"Failed to send participant crypto tokens to {remote_guid_prefix:?}: {e}. Trying \
again later."
);
self.cached_key_exchange_messages_for_resend.insert(message);
} else {
debug!("Sent participant crypto tokens to {remote_guid_prefix:?}.");
}
}
}
for (writer_eid, reader_eid, reader_endpoint, _reader_qos) in SECURE_BUILTIN_READERS_INIT_LIST {
if remotes_builtin_endpoints.contains(*reader_endpoint)
&& *reader_eid != EntityId::P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER
{
let remote_reader_guid = GUID::new(remote_guid_prefix, *reader_eid);
let local_writer_guid = self.local_participant_guid.from_prefix(*writer_eid);
let crypto_tokens_res = self
.security_plugins
.get_plugins()
.create_local_writer_crypto_tokens(local_writer_guid, remote_reader_guid);
match crypto_tokens_res {
Err(e) => {
create_security_error_and_log!(
"Failed to create local writer crypto tokens: {e}. Remote: {remote_guid_prefix:?}"
);
}
Ok(tokens) => {
let message = self.new_volatile_message(
GMCLASSID_SECURITY_DATAWRITER_CRYPTO_TOKENS,
key_exchange_writer.guid(),
local_writer_guid,
remote_guid_prefix,
remote_reader_guid,
&tokens,
);
if let Err(e) = self.send_key_exchange_message(key_exchange_writer, &message) {
create_security_error_and_log!(
"Failed to send local writer {writer_eid:?} crypto tokens to \
{remote_reader_guid:?}: {e}. Trying again later."
);
self.cached_key_exchange_messages_for_resend.insert(message);
} else {
debug!(
"Sent built-in writer {local_writer_guid:?} crypto tokens to remote \
{remote_guid_prefix:?}."
);
}
}
}
}
}
for (writer_eid, reader_eid, writer_endpoint, _writer_qos) in SECURE_BUILTIN_WRITERS_INIT_LIST {
if remotes_builtin_endpoints.contains(*writer_endpoint)
&& *writer_eid != EntityId::P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER
{
let remote_writer_guid = GUID::new(remote_guid_prefix, *writer_eid);
let local_reader_guid = self.local_participant_guid.from_prefix(*reader_eid);
let crypto_tokens_res = self
.security_plugins
.get_plugins()
.create_local_reader_crypto_tokens(local_reader_guid, remote_writer_guid);
match crypto_tokens_res {
Err(e) => {
create_security_error_and_log!(
"Failed to create local reader crypto tokens: {e}. Remote: {remote_guid_prefix:?}"
);
}
Ok(tokens) => {
let message = self.new_volatile_message(
GMCLASSID_SECURITY_DATAREADER_CRYPTO_TOKENS,
key_exchange_writer.guid(),
local_reader_guid,
remote_guid_prefix,
remote_writer_guid,
&tokens,
);
if let Err(e) = self.send_key_exchange_message(key_exchange_writer, &message) {
create_security_error_and_log!(
"Failed to send local reader {reader_eid:?} crypto tokens to \
{remote_writer_guid:?}: {e}. Trying again later."
);
self.cached_key_exchange_messages_for_resend.insert(message);
} else {
debug!(
"Sent built-in reader {local_reader_guid:?} crypto tokens to remote \
{remote_guid_prefix:?}."
);
}
}
}
}
}
}
pub fn start_key_exchange_with_remote_endpoint(
&mut self,
local_endpoint_guid: GUID,
remote_endpoint_guid: GUID,
key_exchange_writer: &no_key::DataWriter<ParticipantVolatileMessageSecure>,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
) {
let remote_is_writer = remote_endpoint_guid.entity_id.entity_kind.is_writer();
let register_result = if remote_is_writer {
self
.security_plugins
.get_plugins()
.register_matched_remote_writer_if_not_already(remote_endpoint_guid, local_endpoint_guid)
} else {
self
.security_plugins
.get_plugins()
.register_matched_remote_reader_if_not_already(
remote_endpoint_guid,
local_endpoint_guid,
self
.relay_only_remote_readers
.contains(&remote_endpoint_guid),
)
};
if let Err(e) = register_result {
create_security_error_and_log!(
"Failed to register remote endpoint {:?} to crypto plugin: {}",
remote_endpoint_guid,
e,
);
}
if let Some(msg) = self
.cached_received_key_exchange_messages
.get(&(local_endpoint_guid, remote_endpoint_guid))
{
let crypto_tokens = msg
.generic
.message_data
.iter()
.map(|dh| CryptoToken::from(dh.clone()))
.collect();
let set_res = if remote_is_writer {
self
.security_plugins
.get_plugins()
.set_remote_writer_crypto_tokens(remote_endpoint_guid, local_endpoint_guid, crypto_tokens)
} else {
self
.security_plugins
.get_plugins()
.set_remote_reader_crypto_tokens(remote_endpoint_guid, local_endpoint_guid, crypto_tokens)
};
if let Err(e) = set_res {
create_security_error_and_log!(
"Failed to set stored remote reader crypto tokens: {}. Remote: {:?}",
e,
remote_endpoint_guid
);
} else {
debug!("Set stored remote crypto tokens. Remote: {remote_endpoint_guid:?}");
self
.cached_received_key_exchange_messages
.remove(&(local_endpoint_guid, remote_endpoint_guid));
}
}
let we_have_sent_ours = self
.user_data_endpoints_with_keys_already_sent_to
.contains(&remote_endpoint_guid);
if we_have_sent_ours {
return;
}
let sec_info_opt = if remote_is_writer {
discovery_db_read(discovery_db)
.get_topic_writer(&remote_endpoint_guid)
.map(|writer| writer.publication_topic_data.security_info().clone())
} else {
discovery_db_read(discovery_db)
.get_topic_reader(&remote_endpoint_guid)
.map(|reader| reader.subscription_topic_data.security_info().clone())
};
let sec_attr = match sec_info_opt.flatten() {
Some(info) => EndpointSecurityAttributes::from(info),
None => {
create_security_error_and_log!(
"Could not find EndpointSecurityAttributes for remote {:?}",
remote_endpoint_guid,
);
return;
}
};
#[allow(clippy::if_same_then_else)] let need_to_send_keys = if remote_is_writer {
sec_attr.is_payload_protected || sec_attr.is_submessage_protected
} else {
sec_attr.is_payload_protected || sec_attr.is_submessage_protected
};
if !need_to_send_keys {
trace!("Key exchange is not needed with remote {remote_endpoint_guid:?}");
self
.user_data_endpoints_with_keys_already_sent_to
.insert(remote_endpoint_guid);
return;
}
let crypto_tokens_res = if remote_is_writer {
self
.security_plugins
.get_plugins()
.create_local_reader_crypto_tokens(local_endpoint_guid, remote_endpoint_guid)
} else {
self
.security_plugins
.get_plugins()
.create_local_writer_crypto_tokens(local_endpoint_guid, remote_endpoint_guid)
};
let crypto_tokens = match crypto_tokens_res {
Ok(tokens) => tokens,
Err(e) => {
create_security_error_and_log!(
"Failed to create CryptoTokens: {}. Local endpoint: {:?}",
e,
local_endpoint_guid,
);
return;
}
};
let remote_is_us = remote_endpoint_guid.prefix == self.local_participant_guid.prefix;
if remote_is_us {
let set_res = if remote_is_writer {
self
.security_plugins
.get_plugins()
.set_remote_writer_crypto_tokens(remote_endpoint_guid, local_endpoint_guid, crypto_tokens)
} else {
self
.security_plugins
.get_plugins()
.set_remote_reader_crypto_tokens(remote_endpoint_guid, local_endpoint_guid, crypto_tokens)
};
if let Err(e) = set_res {
create_security_error_and_log!(
"Failed to set our own crypto tokens as remote tokens: {}. Guid: {:?}",
e,
remote_endpoint_guid
);
return;
} else {
debug!("Set our own crypto tokens as remote tokens. Guid: {remote_endpoint_guid:?}");
}
} else {
let vol_msg = if remote_is_writer {
self.new_volatile_message(
GMCLASSID_SECURITY_DATAREADER_CRYPTO_TOKENS,
key_exchange_writer.guid(),
local_endpoint_guid,
remote_endpoint_guid.prefix,
remote_endpoint_guid,
&crypto_tokens,
)
} else {
self.new_volatile_message(
GMCLASSID_SECURITY_DATAWRITER_CRYPTO_TOKENS,
key_exchange_writer.guid(),
local_endpoint_guid,
remote_endpoint_guid.prefix,
remote_endpoint_guid,
&crypto_tokens,
)
};
if let Err(e) = self.send_key_exchange_message(key_exchange_writer, &vol_msg) {
create_security_error_and_log!(
"Failed to send local endpoint {:?} crypto tokens to {:?}: {e}. Trying again later.",
local_endpoint_guid.entity_id,
remote_endpoint_guid
);
self.cached_key_exchange_messages_for_resend.insert(vol_msg);
} else {
security_info!(
"Sent local endpoint {:?} crypto tokens to {:?}.",
local_endpoint_guid.entity_id,
remote_endpoint_guid
);
}
}
self
.user_data_endpoints_with_keys_already_sent_to
.insert(remote_endpoint_guid);
}
fn validate_remote_participant_permissions(
&mut self,
remote_guid_prefix: GuidPrefix,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
) -> SecurityResult<()> {
let mut sec_plugins = self.security_plugins.get_plugins();
let permissions_token = discovery_db_read(discovery_db)
.find_participant_proxy(remote_guid_prefix)
.and_then(|data| data.permissions_token.clone())
.ok_or_else(|| security_error("Could not get PermissionsToken from DiscoveryDB"))?;
let auth_credential_token =
sec_plugins.get_authenticated_peer_credential_token(remote_guid_prefix)?;
sec_plugins.validate_remote_permissions(
self.local_participant_guid.prefix,
remote_guid_prefix,
&permissions_token,
&auth_credential_token,
)
}
fn send_key_exchange_message(
&self,
key_exchange_writer: &no_key::DataWriter<ParticipantVolatileMessageSecure>,
message: &ParticipantVolatileMessageSecure,
) -> Result<(), String> {
let remote_guidp = message.generic.destination_participant_guid.prefix;
let opts = WriteOptionsBuilder::new()
.to_single_reader(GUID::new(
remote_guidp,
EntityId::P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER,
))
.build();
if let Err(e) = key_exchange_writer.write_with_options(message.clone(), opts) {
Err(e.to_string())
} else {
Ok(())
}
}
fn resend_final_handshake_message(
&self,
remote_guid_prefix: GuidPrefix,
auth_msg_writer: &no_key::DataWriter<ParticipantStatelessMessage>,
) {
if let Some(stored_msg) = self.stored_authentication_messages.get(&remote_guid_prefix) {
let _ = auth_msg_writer
.write(stored_msg.message.clone(), None)
.map_err(|err| {
warn!(
"Failed to send a final handshake message. Remote GUID prefix: \
{remote_guid_prefix:?}. Error: {err}"
);
});
} else {
warn!(
"Did not find the final handshake request to send. Remote guid prefix: \
{remote_guid_prefix:?}"
);
}
}
fn is_stateless_msg_for_local_participant(&self, message: &ParticipantStatelessMessage) -> bool {
let destination_participant_guid = message.generic.destination_participant_guid;
destination_participant_guid == self.local_participant_guid
}
fn check_is_stateless_msg_related_to_our_msg(
&self,
message: &ParticipantStatelessMessage,
sender_guid_prefix: GuidPrefix,
) -> bool {
let message_sent_by_us = match self.stored_authentication_messages.get(&sender_guid_prefix) {
Some(msg) => &msg.message,
None => {
debug!("Did not find an unanswered message for guid prefix {sender_guid_prefix:?}");
return false;
}
};
message.generic.related_message_identity == message_sent_by_us.generic.message_identity
}
fn get_handshake_state(&self, remote_guid_prefix: &GuidPrefix) -> Option<DiscHandshakeState> {
self.handshake_states.get(remote_guid_prefix).copied()
}
fn update_handshake_state(&mut self, remote_guid_prefix: GuidPrefix, state: DiscHandshakeState) {
self.handshake_states.insert(remote_guid_prefix, state);
}
fn get_serialized_local_participant_data(
&self,
discovery_db: &Arc<RwLock<DiscoveryDB>>,
) -> SecurityResult<Vec<u8>> {
let my_ser_data = discovery_db_read(discovery_db)
.find_participant_proxy(self.local_participant_guid.prefix)
.expect("My own participant data disappeared from DiscoveryDB")
.to_pl_cdr_bytes(RepresentationIdentifier::PL_CDR_BE)
.map_err(|e| create_security_error_and_log!("Serializing participant data failed: {e}"))?;
Ok(my_ser_data.to_vec())
}
fn new_stateless_message(
&mut self,
message_class_id: &str,
destination_guid_prefix: GuidPrefix,
related_message_opt: Option<&ParticipantStatelessMessage>,
handshake_token: HandshakeMessageToken,
) -> ParticipantStatelessMessage {
let generic_message = self.generic_message_helper.new_message(
message_class_id,
self.local_participant_guid, GUID::GUID_UNKNOWN, related_message_opt.map(|msg| &msg.generic),
destination_guid_prefix,
GUID::GUID_UNKNOWN, vec![handshake_token.data_holder],
);
ParticipantStatelessMessage::from(generic_message)
}
fn new_volatile_message(
&mut self,
message_class_id: &str,
volatile_writer_guid: GUID,
source_endpoint_guid: GUID,
destination_guid_prefix: GuidPrefix,
destination_endpoint_guid: GUID,
crypto_tokens: &[CryptoToken],
) -> ParticipantVolatileMessageSecure {
let generic_message = self.generic_message_helper.new_message(
message_class_id,
volatile_writer_guid,
source_endpoint_guid,
None, destination_guid_prefix,
destination_endpoint_guid,
crypto_tokens
.iter()
.map(|token| token.data_holder.clone())
.collect(),
);
ParticipantVolatileMessageSecure::from(generic_message)
}
}
fn send_discovery_notification(
discovery_updated_sender: &mio_channel::SyncSender<DiscoveryNotificationType>,
dntype: DiscoveryNotificationType,
) {
match discovery_updated_sender.send(dntype) {
Ok(_) => (),
Err(e) => error!("Failed to send DiscoveryNotification {e:?}"),
}
}
fn get_handshake_token_from_stateless_message(
message: &ParticipantStatelessMessage,
) -> Option<HandshakeMessageToken> {
let source_guid_prefix = message.generic.source_guid_prefix();
let message_data = &message.generic.message_data;
if message.generic.message_data.len() > 1 {
warn!(
"ParticipantStatelessMessage for handshake contains more than one data holder. Using only \
the first one. Source guid prefix: {source_guid_prefix:?}"
);
}
message_data
.first()
.map(|data_holder| HandshakeMessageToken::from(data_holder.clone()))
}
fn register_remote_to_crypto(
local_guidp: GuidPrefix,
remote_guidp: GuidPrefix,
security_plugins_handle: &SecurityPluginsHandle,
) -> SecurityResult<()> {
let shared_secret = security_plugins_handle
.get_plugins()
.get_shared_secret(remote_guidp); shared_secret
.and_then(|shared_secret| {
security_plugins_handle
.get_plugins()
.register_matched_remote_participant(remote_guidp, shared_secret)
})
.map_err(|e| {
create_security_error_and_log!(
"Failed to register remote participant with the crypto plugin: {}. Remote: {:?}",
e,
remote_guidp
)
})?;
trace!("Registered remote participant {remote_guidp:?} with the crypto plugin.");
for (writer_eid, reader_eid, _reader_endpoint, _reader_qos) in SECURE_BUILTIN_READERS_INIT_LIST {
let remote_reader_guid = GUID::new(remote_guidp, *reader_eid);
let local_writer_guid = GUID::new(local_guidp, *writer_eid);
security_plugins_handle
.get_plugins()
.register_matched_remote_reader_if_not_already(remote_reader_guid, local_writer_guid, false)
.map_err(|e| {
create_security_error_and_log!(
"Failed to register remote built-in reader {:?} to crypto plugin: {}",
remote_reader_guid,
e,
)
})?;
trace!("Registered remote reader with the crypto plugin. GUID: {remote_reader_guid:?}");
}
for (writer_eid, reader_eid, _writer_endpoint, _writer_qos) in SECURE_BUILTIN_WRITERS_INIT_LIST {
let remote_writer_guid = GUID::new(remote_guidp, *writer_eid);
let local_reader_guid = GUID::new(local_guidp, *reader_eid);
security_plugins_handle
.get_plugins()
.register_matched_remote_writer_if_not_already(remote_writer_guid, local_reader_guid)
.map_err(|e| {
create_security_error_and_log!(
"Failed to register remote built-in writer {:?} to crypto plugin: {}",
remote_writer_guid,
e,
)
})?;
trace!("Registered remote writer with the crypto plugin. GUID: {remote_writer_guid:?}");
}
Ok(())
}
struct ParticipantGenericMessageHelper {
next_seqnum: SequenceNumber,
}
impl ParticipantGenericMessageHelper {
pub fn new() -> Self {
Self {
next_seqnum: SequenceNumber::new(1),
}
}
fn get_next_seqnum(&mut self) -> SequenceNumber {
let next = self.next_seqnum;
self.next_seqnum = self.next_seqnum + SequenceNumber::new(1);
next
}
#[allow(clippy::too_many_arguments)]
pub fn new_message(
&mut self,
message_class_id: &str,
msg_identity_source_guid: GUID,
source_endpoint_guid: GUID,
related_message_opt: Option<&ParticipantGenericMessage>,
destination_guid_prefix: GuidPrefix,
destination_endpoint_guid: GUID,
data_holders: Vec<DataHolder>,
) -> ParticipantGenericMessage {
let message_identity = rpc::SampleIdentity {
writer_guid: msg_identity_source_guid,
sequence_number: self.get_next_seqnum(),
};
let related_message_identity = if let Some(msg) = related_message_opt {
msg.message_identity
} else {
rpc::SampleIdentity {
writer_guid: GUID::GUID_UNKNOWN,
sequence_number: SequenceNumber::zero(),
}
};
let destination_participant_guid = GUID::new(destination_guid_prefix, EntityId::PARTICIPANT);
ParticipantGenericMessage {
message_identity,
related_message_identity,
destination_participant_guid,
destination_endpoint_guid,
source_endpoint_guid,
message_class_id: message_class_id.to_string(),
message_data: data_holders,
}
}
}