use std::{
io,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use futures::stream::{FusedStream, Stream};
use mio_06::Evented;
use mio_extras::channel as mio_channel;
use mio_08::{event, Interest, Registry, Token};
use chrono::Utc;
use crate::{
dds::{qos::QosPolicyId, topic::TopicData},
discovery::SpdpDiscoveredParticipantData,
messages::{protocol_version::ProtocolVersion, vendor_id::VendorId},
mio_source::*,
structure::guid::GuidPrefix,
Duration, QosPolicies, GUID,
};
#[cfg(feature = "security")]
use crate::discovery::secure_discovery::AuthenticationStatus;
pub trait StatusEvented<'a, E, S>
where
S: Stream<Item = E>,
S: FusedStream,
{
fn as_status_evented(&mut self) -> &dyn Evented; fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source; fn as_async_status_stream(&'a self) -> S;
fn try_recv_status(&self) -> Option<E>;
}
pub(crate) fn sync_status_channel<T>(
capacity: usize,
) -> io::Result<(StatusChannelSender<T>, StatusChannelReceiver<T>)> {
let (signal_receiver, signal_sender) = make_poll_channel()?;
let (actual_sender, actual_receiver) = mio_channel::sync_channel(capacity);
let waker = Arc::new(Mutex::new(None));
Ok((
StatusChannelSender {
actual_sender,
signal_sender,
waker: Arc::clone(&waker),
},
StatusChannelReceiver {
actual_receiver: Mutex::new(actual_receiver),
signal_receiver,
waker,
},
))
}
#[derive(Clone)]
pub struct StatusChannelSender<T> {
actual_sender: mio_channel::SyncSender<T>,
signal_sender: PollEventSender,
waker: Arc<Mutex<Option<Waker>>>,
}
pub struct StatusChannelReceiver<T> {
actual_receiver: Mutex<mio_channel::Receiver<T>>,
signal_receiver: PollEventSource,
waker: Arc<Mutex<Option<Waker>>>,
}
impl<T> StatusChannelSender<T> {
pub fn try_send(&self, t: T) -> Result<(), mio_channel::TrySendError<T>> {
let mut w = self.waker.lock().unwrap(); match self.actual_sender.try_send(t) {
Ok(()) => {
self.signal_sender.send();
w.as_ref().map(|w| w.wake_by_ref());
*w = None;
Ok(())
}
Err(mio_channel::TrySendError::Full(_tt)) => {
trace!("StatusChannelSender cannot send new status changes, channel is full.");
self.signal_sender.send(); w.as_ref().map(|w| w.wake_by_ref());
*w = None;
Ok(())
}
Err(other_fail) => Err(other_fail),
}
}
}
impl<T> StatusChannelReceiver<T> {
pub fn try_recv(&self) -> Result<T, std::sync::mpsc::TryRecvError> {
self.signal_receiver.drain();
self.actual_receiver.lock().unwrap().try_recv()
}
pub(crate) fn get_waker_update_lock(&self) -> std::sync::MutexGuard<'_, Option<Waker>> {
self.waker.lock().unwrap()
}
}
impl<'a, E> StatusEvented<'a, E, StatusReceiverStream<'a, E>> for StatusChannelReceiver<E> {
fn as_status_evented(&mut self) -> &dyn Evented {
self
}
fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source {
self
}
fn as_async_status_stream(&'a self) -> StatusReceiverStream<'a, E> {
StatusReceiverStream {
sync_receiver: self,
terminated: AtomicBool::new(false),
}
}
fn try_recv_status(&self) -> Option<E> {
self.try_recv().ok()
}
}
impl<E> Evented for StatusChannelReceiver<E> {
fn register(
&self,
poll: &mio_06::Poll,
token: mio_06::Token,
interest: mio_06::Ready,
opts: mio_06::PollOpt,
) -> io::Result<()> {
self
.actual_receiver
.lock()
.unwrap()
.register(poll, token, interest, opts)
}
fn reregister(
&self,
poll: &mio_06::Poll,
token: mio_06::Token,
interest: mio_06::Ready,
opts: mio_06::PollOpt,
) -> io::Result<()> {
self
.actual_receiver
.lock()
.unwrap()
.reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &mio_06::Poll) -> io::Result<()> {
self.actual_receiver.lock().unwrap().deregister(poll)
}
}
impl<T> event::Source for StatusChannelReceiver<T> {
fn register(&mut self, registry: &Registry, token: Token, interests: Interest) -> io::Result<()> {
self.signal_receiver.register(registry, token, interests)
}
fn reregister(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()> {
self.signal_receiver.reregister(registry, token, interests)
}
fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
self.signal_receiver.deregister(registry)
}
}
use std::sync::atomic::{AtomicBool, Ordering};
pub struct StatusReceiverStream<'a, T> {
sync_receiver: &'a StatusChannelReceiver<T>,
terminated: AtomicBool,
}
impl<T> Stream for StatusReceiverStream<'_, T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut w = self.sync_receiver.get_waker_update_lock();
match self.sync_receiver.try_recv() {
Err(std::sync::mpsc::TryRecvError::Empty) => {
*w = Some(cx.waker().clone());
Poll::Pending
}
Err(std::sync::mpsc::TryRecvError::Disconnected) => {
self.terminated.store(true, Ordering::SeqCst);
warn!("StatusReceiver channel disconnected");
Poll::Ready(None)
}
Ok(t) => Poll::Ready(Some(t)), }
} }
impl<T> FusedStream for StatusReceiverStream<'_, T> {
fn is_terminated(&self) -> bool {
self.terminated.load(Ordering::SeqCst)
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum DomainParticipantStatusEvent {
ParticipantDiscovered {
dpd: ParticipantDescription,
},
ParticipantLost {
id: GuidPrefix,
reason: LostReason,
},
InconsistentTopic {
previous_topic_data: Box<TopicData>, previous_source: GUID,
discovered_topic_data: Box<TopicData>, discovery_source: GUID, },
TopicDetected {
name: String,
type_name: String,
},
TopicLost {
name: String,
},
ReaderDetected {
reader: EndpointDescription,
},
WriterDetected {
writer: EndpointDescription,
},
ReaderLost {
guid: GUID,
reason: LostReason,
},
WriterLost {
guid: GUID,
reason: LostReason,
},
RemoteReaderMatched {
local_writer: GUID,
remote_reader: GUID,
},
RemoteWriterMatched {
local_reader: GUID,
remote_writer: GUID,
},
RemoteReaderQosIncompatible {
local_writer: GUID,
remote_reader: GUID,
requested_qos: Box<QosPolicies>,
offered_qos: Box<QosPolicies>,
},
RemoteWriterQosIncompatible {
local_reader: GUID,
remote_writer: GUID,
requested_qos: Box<QosPolicies>,
offered_qos: Box<QosPolicies>,
},
#[cfg(feature = "security")]
Authentication {
participant: GuidPrefix,
status: AuthenticationStatus,
},
#[cfg(feature = "security")]
IdentityRevoked {
participant: GuidPrefix,
},
#[cfg(feature = "security")]
PermissionsRevoked {
participant: GuidPrefix,
},
}
#[derive(Debug, Clone)]
pub enum LostReason {
Disposed,
Timeout {
lease: Duration, elapsed: Duration, },
}
#[derive(Debug, Clone)]
pub struct ParticipantDescription {
pub updated_time: chrono::DateTime<Utc>,
pub protocol_version: ProtocolVersion,
pub vendor_id: VendorId,
pub guid: GUID,
pub lease_duration: Option<Duration>,
pub entity_name: Option<String>,
#[cfg(feature = "security")]
pub supports_security: bool,
}
impl From<&SpdpDiscoveredParticipantData> for ParticipantDescription {
fn from(dpd: &SpdpDiscoveredParticipantData) -> Self {
ParticipantDescription {
updated_time: dpd.updated_time,
protocol_version: dpd.protocol_version,
vendor_id: dpd.vendor_id,
guid: dpd.participant_guid,
lease_duration: dpd.lease_duration,
entity_name: dpd.entity_name.clone(),
#[cfg(feature = "security")]
supports_security: dpd.supports_security(),
}
}
}
#[derive(Debug, Clone)]
pub struct EndpointDescription {
pub updated_time: chrono::DateTime<Utc>,
pub guid: GUID,
pub topic_name: String,
pub type_name: String,
pub qos: QosPolicies,
}
#[derive(Debug, Clone)]
pub enum DataReaderStatus {
SampleRejected {
count: CountWithChange,
last_reason: SampleRejectedStatusKind,
},
LivelinessChanged {
alive_total: CountWithChange,
not_alive_total: CountWithChange,
},
RequestedDeadlineMissed {
count: CountWithChange,
},
RequestedIncompatibleQos {
count: CountWithChange,
last_policy_id: QosPolicyId,
writer: GUID,
requested_qos: Box<QosPolicies>,
offered_qos: Box<QosPolicies>,
},
SampleLost { count: CountWithChange },
SubscriptionMatched {
total: CountWithChange,
current: CountWithChange,
writer: GUID,
},
}
#[derive(Debug, Clone)]
pub enum DataWriterStatus {
LivelinessLost {
count: CountWithChange,
},
OfferedDeadlineMissed {
count: CountWithChange,
},
OfferedIncompatibleQos {
count: CountWithChange,
last_policy_id: QosPolicyId,
reader: GUID,
requested_qos: Box<QosPolicies>,
offered_qos: Box<QosPolicies>,
},
PublicationMatched {
total: CountWithChange,
current: CountWithChange,
reader: GUID,
},
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct CountWithChange {
count: i32,
count_change: i32,
}
impl CountWithChange {
pub(crate) fn new(count: i32, count_change: i32) -> Self {
Self {
count,
count_change,
}
}
pub fn start_from(count: i32, count_change: i32) -> Self {
Self {
count,
count_change,
}
}
pub fn count(&self) -> i32 {
self.count
}
pub fn count_change(&self) -> i32 {
self.count_change
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SampleRejectedStatusKind {
NotRejected,
ByInstancesLimit,
BySamplesLimit,
BySamplesPerInstanceLimit,
}