use core::{
cell::RefCell, fmt::Debug, hash::Hash, marker::PhantomData, sync::atomic::Ordering,
time::Duration,
};
use std::collections::HashMap;
use iceoryx2_bb_elementary::CallbackProgression;
use iceoryx2_bb_log::fail;
use iceoryx2_bb_posix::{
deadline_queue::{DeadlineQueue, DeadlineQueueBuilder, DeadlineQueueGuard, DeadlineQueueIndex},
file_descriptor::FileDescriptor,
file_descriptor_set::SynchronousMultiplexing,
signal::SignalHandler,
};
use iceoryx2_cal::reactor::*;
use iceoryx2_pal_concurrency_sync::iox_atomic::IoxAtomicUsize;
use crate::signal_handling_mode::SignalHandlingMode;
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum WaitSetRunResult {
TerminationRequest,
Interrupt,
StopRequest,
AllEventsHandled,
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum WaitSetAttachmentError {
InsufficientCapacity,
AlreadyAttached,
InternalError,
}
impl core::fmt::Display for WaitSetAttachmentError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "WaitSetAttachmentError::{self:?}")
}
}
impl core::error::Error for WaitSetAttachmentError {}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum WaitSetRunError {
InsufficientPermissions,
InternalError,
NoAttachments,
}
impl core::fmt::Display for WaitSetRunError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "WaitSetRunError::{self:?}")
}
}
impl core::error::Error for WaitSetRunError {}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum WaitSetCreateError {
InternalError,
}
impl core::fmt::Display for WaitSetCreateError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "WaitSetCreateError::{self:?}")
}
}
impl core::error::Error for WaitSetCreateError {}
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, PartialOrd, Ord)]
enum AttachmentIdType {
Tick(u64, DeadlineQueueIndex),
Deadline(u64, i32, DeadlineQueueIndex),
Notification(u64, i32),
}
#[derive(Clone, Copy)]
pub struct WaitSetAttachmentId<Service: crate::service::Service> {
attachment_type: AttachmentIdType,
_data: PhantomData<Service>,
}
impl<Service: crate::service::Service> Debug for WaitSetAttachmentId<Service> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(
f,
"WaitSetAttachmentId<{}> {{ attachment_type: {:?} }}",
core::any::type_name::<Service>(),
self.attachment_type
)
}
}
impl<Service: crate::service::Service> WaitSetAttachmentId<Service> {
pub fn from_guard(guard: &WaitSetGuard<Service>) -> Self {
match &guard.guard_type {
GuardType::Tick(t) => WaitSetAttachmentId::tick(guard.waitset, t.index()),
GuardType::Deadline(r, t) => WaitSetAttachmentId::deadline(
guard.waitset,
unsafe { r.file_descriptor().native_handle() },
t.index(),
),
GuardType::Notification(r) => {
WaitSetAttachmentId::notification(guard.waitset, unsafe {
r.file_descriptor().native_handle()
})
}
}
}
}
impl<Service: crate::service::Service> PartialOrd for WaitSetAttachmentId<Service> {
fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<Service: crate::service::Service> Ord for WaitSetAttachmentId<Service> {
fn cmp(&self, other: &Self) -> core::cmp::Ordering {
self.attachment_type.cmp(&other.attachment_type)
}
}
impl<Service: crate::service::Service> PartialEq for WaitSetAttachmentId<Service> {
fn eq(&self, other: &Self) -> bool {
self.attachment_type == other.attachment_type
}
}
impl<Service: crate::service::Service> Eq for WaitSetAttachmentId<Service> {}
impl<Service: crate::service::Service> Hash for WaitSetAttachmentId<Service> {
fn hash<H: core::hash::Hasher>(&self, state: &mut H) {
self.attachment_type.hash(state)
}
}
impl<Service: crate::service::Service> WaitSetAttachmentId<Service> {
fn tick(waitset: &WaitSet<Service>, deadline_queue_idx: DeadlineQueueIndex) -> Self {
Self {
attachment_type: AttachmentIdType::Tick(
waitset as *const WaitSet<Service> as u64,
deadline_queue_idx,
),
_data: PhantomData,
}
}
fn deadline(
waitset: &WaitSet<Service>,
reactor_idx: i32,
deadline_queue_idx: DeadlineQueueIndex,
) -> Self {
Self {
attachment_type: AttachmentIdType::Deadline(
waitset as *const WaitSet<Service> as u64,
reactor_idx,
deadline_queue_idx,
),
_data: PhantomData,
}
}
fn notification(waitset: &WaitSet<Service>, reactor_idx: i32) -> Self {
Self {
attachment_type: AttachmentIdType::Notification(
waitset as *const WaitSet<Service> as u64,
reactor_idx,
),
_data: PhantomData,
}
}
pub fn has_event_from(&self, other: &WaitSetGuard<Service>) -> bool {
let other_attachment = WaitSetAttachmentId::from_guard(other);
if let AttachmentIdType::Deadline(other_waitset, other_reactor_idx, _) =
other_attachment.attachment_type
{
if let AttachmentIdType::Notification(waitset, reactor_idx) = self.attachment_type {
waitset == other_waitset && reactor_idx == other_reactor_idx
} else {
false
}
} else {
self.attachment_type == other_attachment.attachment_type
}
}
pub fn has_missed_deadline(&self, other: &WaitSetGuard<Service>) -> bool {
if let AttachmentIdType::Deadline(..) = self.attachment_type {
self.attachment_type == WaitSetAttachmentId::from_guard(other).attachment_type
} else {
false
}
}
}
enum GuardType<'waitset, 'attachment, Service: crate::service::Service>
where
Service::Reactor: 'waitset,
{
Tick(DeadlineQueueGuard<'waitset>),
Deadline(
<Service::Reactor as Reactor>::Guard<'waitset, 'attachment>,
DeadlineQueueGuard<'waitset>,
),
Notification(<Service::Reactor as Reactor>::Guard<'waitset, 'attachment>),
}
pub struct WaitSetGuard<'waitset, 'attachment, Service: crate::service::Service>
where
Service::Reactor: 'waitset,
{
waitset: &'waitset WaitSet<Service>,
guard_type: GuardType<'waitset, 'attachment, Service>,
}
impl<Service: crate::service::Service> Drop for WaitSetGuard<'_, '_, Service> {
fn drop(&mut self) {
if let GuardType::Deadline(r, t) = &self.guard_type {
self.waitset
.remove_deadline(unsafe { r.file_descriptor().native_handle() }, t.index())
}
self.waitset.detach();
}
}
#[derive(Default, Debug, Clone)]
pub struct WaitSetBuilder {
signal_handling_mode: SignalHandlingMode,
}
impl WaitSetBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn signal_handling_mode(mut self, value: SignalHandlingMode) -> Self {
self.signal_handling_mode = value;
self
}
pub fn create<Service: crate::service::Service>(
self,
) -> Result<WaitSet<Service>, WaitSetCreateError> {
let msg = "Unable to create WaitSet";
let deadline_queue = fail!(from self, when DeadlineQueueBuilder::new().create(),
with WaitSetCreateError::InternalError,
"{msg} since the underlying Timer could not be created.");
match <Service::Reactor as Reactor>::Builder::new().create() {
Ok(reactor) => Ok(WaitSet {
reactor,
deadline_queue,
attachment_to_deadline: RefCell::new(HashMap::new()),
deadline_to_attachment: RefCell::new(HashMap::new()),
attachment_counter: IoxAtomicUsize::new(0),
signal_handling_mode: self.signal_handling_mode,
}),
Err(ReactorCreateError::UnknownError(e)) => {
fail!(from self, with WaitSetCreateError::InternalError,
"{msg} due to an internal error (error code = {})", e);
}
}
}
}
#[derive(Debug)]
pub struct WaitSet<Service: crate::service::Service> {
reactor: Service::Reactor,
deadline_queue: DeadlineQueue,
attachment_to_deadline: RefCell<HashMap<i32, DeadlineQueueIndex>>,
deadline_to_attachment: RefCell<HashMap<DeadlineQueueIndex, i32>>,
attachment_counter: IoxAtomicUsize,
signal_handling_mode: SignalHandlingMode,
}
impl<Service: crate::service::Service> WaitSet<Service> {
fn detach(&self) {
self.attachment_counter.fetch_sub(1, Ordering::Relaxed);
}
fn attach(&self) -> Result<(), WaitSetAttachmentError> {
if self.len() == self.capacity() {
fail!(from self, with WaitSetAttachmentError::InsufficientCapacity,
"Unable to add attachment since it would exceed the capacity of {}.", self.capacity());
}
self.attachment_counter.fetch_add(1, Ordering::Relaxed);
Ok(())
}
fn remove_deadline(&self, reactor_idx: i32, deadline_queue_idx: DeadlineQueueIndex) {
self.attachment_to_deadline
.borrow_mut()
.remove(&reactor_idx);
self.deadline_to_attachment
.borrow_mut()
.remove(&deadline_queue_idx);
}
fn reset_deadline(
&self,
reactor_idx: i32,
) -> Result<Option<DeadlineQueueIndex>, WaitSetRunError> {
let msg = "Unable to reset deadline";
if let Some(deadline_queue_idx) = self.attachment_to_deadline.borrow().get(&reactor_idx) {
fail!(from self,
when self.deadline_queue.reset(*deadline_queue_idx),
with WaitSetRunError::InternalError,
"{msg} since the deadline_queue guard could not be reset for the attachment {reactor_idx}. Continuing operations will lead to invalid deadline failures.");
Ok(Some(*deadline_queue_idx))
} else {
Ok(None)
}
}
fn handle_deadlines<F: FnMut(WaitSetAttachmentId<Service>) -> CallbackProgression>(
&self,
fn_call: &mut F,
error_msg: &str,
) -> Result<WaitSetRunResult, WaitSetRunError> {
let deadline_to_attachment = self.deadline_to_attachment.borrow();
let mut result = WaitSetRunResult::AllEventsHandled;
let call = |idx: DeadlineQueueIndex| -> CallbackProgression {
let progression = if let Some(reactor_idx) = deadline_to_attachment.get(&idx) {
fn_call(WaitSetAttachmentId::deadline(self, *reactor_idx, idx))
} else {
fn_call(WaitSetAttachmentId::tick(self, idx))
};
if let CallbackProgression::Stop = progression {
result = WaitSetRunResult::StopRequest;
}
progression
};
fail!(from self,
when self.deadline_queue.missed_deadlines(call),
with WaitSetRunError::InternalError,
"{error_msg} since the missed deadlines could not be acquired.");
Ok(result)
}
fn handle_all_attachments<F: FnMut(WaitSetAttachmentId<Service>) -> CallbackProgression>(
&self,
triggered_file_descriptors: &Vec<i32>,
fn_call: &mut F,
error_msg: &str,
) -> Result<WaitSetRunResult, WaitSetRunError> {
let mut fd_and_deadline_queue_idx = Vec::with_capacity(triggered_file_descriptors.len());
for fd in triggered_file_descriptors {
fd_and_deadline_queue_idx.push((fd, self.reset_deadline(*fd)?));
}
match self.handle_deadlines(fn_call, error_msg)? {
WaitSetRunResult::AllEventsHandled => (),
v => return Ok(v),
};
for fd in triggered_file_descriptors {
if let CallbackProgression::Stop = fn_call(WaitSetAttachmentId::notification(self, *fd))
{
return Ok(WaitSetRunResult::StopRequest);
}
}
Ok(WaitSetRunResult::AllEventsHandled)
}
pub fn attach_notification<'waitset, 'attachment, T: SynchronousMultiplexing + Debug>(
&'waitset self,
attachment: &'attachment T,
) -> Result<WaitSetGuard<'waitset, 'attachment, Service>, WaitSetAttachmentError> {
let reactor_guard = self.attach_to_reactor(attachment)?;
self.attach()?;
Ok(WaitSetGuard {
waitset: self,
guard_type: GuardType::Notification(reactor_guard),
})
}
pub fn attach_deadline<'waitset, 'attachment, T: SynchronousMultiplexing + Debug>(
&'waitset self,
attachment: &'attachment T,
deadline: Duration,
) -> Result<WaitSetGuard<'waitset, 'attachment, Service>, WaitSetAttachmentError> {
let reactor_guard = self.attach_to_reactor(attachment)?;
let deadline_queue_guard = self.attach_to_deadline_queue(deadline)?;
let reactor_idx = unsafe { reactor_guard.file_descriptor().native_handle() };
let deadline_idx = deadline_queue_guard.index();
self.attachment_to_deadline
.borrow_mut()
.insert(reactor_idx, deadline_idx);
self.deadline_to_attachment
.borrow_mut()
.insert(deadline_idx, reactor_idx);
self.attach()?;
Ok(WaitSetGuard {
waitset: self,
guard_type: GuardType::Deadline(reactor_guard, deadline_queue_guard),
})
}
pub fn attach_interval(
&self,
interval: Duration,
) -> Result<WaitSetGuard<'_, '_, Service>, WaitSetAttachmentError> {
let deadline_queue_guard = self.attach_to_deadline_queue(interval)?;
self.attach()?;
Ok(WaitSetGuard {
waitset: self,
guard_type: GuardType::Tick(deadline_queue_guard),
})
}
pub fn wait_and_process<F: FnMut(WaitSetAttachmentId<Service>) -> CallbackProgression>(
&self,
mut fn_call: F,
) -> Result<WaitSetRunResult, WaitSetRunError> {
loop {
match self.wait_and_process_once(&mut fn_call) {
Ok(WaitSetRunResult::AllEventsHandled) => (),
Ok(v) => return Ok(v),
Err(e) => {
fail!(from self, with e,
"Unable to run in WaitSet::wait_and_process() loop since ({:?}) has occurred.", e);
}
}
}
}
pub fn wait_and_process_once<F: FnMut(WaitSetAttachmentId<Service>) -> CallbackProgression>(
&self,
fn_call: F,
) -> Result<WaitSetRunResult, WaitSetRunError> {
self.wait_and_process_once_with_timeout(fn_call, Duration::MAX)
}
pub fn wait_and_process_once_with_timeout<
F: FnMut(WaitSetAttachmentId<Service>) -> CallbackProgression,
>(
&self,
mut fn_call: F,
timeout: Duration,
) -> Result<WaitSetRunResult, WaitSetRunError> {
let msg = "Unable to call WaitSet::wait_and_process_once_with_timeout()";
if self.signal_handling_mode == SignalHandlingMode::HandleTerminationRequests
&& SignalHandler::termination_requested()
{
return Ok(WaitSetRunResult::TerminationRequest);
}
if self.is_empty() {
fail!(from self, with WaitSetRunError::NoAttachments,
"{msg} since the WaitSet has no attachments, therefore the call would end up in a deadlock.");
}
let next_timeout = fail!(from self,
when self.deadline_queue.duration_until_next_deadline(),
with WaitSetRunError::InternalError,
"{msg} since the next timeout could not be acquired.");
let next_timeout = next_timeout.min(timeout);
let mut triggered_file_descriptors = vec![];
let collect_triggered_fds = |fd: &FileDescriptor| {
let fd = unsafe { fd.native_handle() };
triggered_file_descriptors.push(fd);
};
let reactor_wait_result = if next_timeout == Duration::MAX {
self.reactor.blocking_wait(collect_triggered_fds)
} else {
self.reactor.timed_wait(collect_triggered_fds, next_timeout)
};
match reactor_wait_result {
Ok(0) => self.handle_deadlines(&mut fn_call, msg),
Ok(_) => self.handle_all_attachments(&triggered_file_descriptors, &mut fn_call, msg),
Err(ReactorWaitError::Interrupt) => Ok(WaitSetRunResult::Interrupt),
Err(ReactorWaitError::InsufficientPermissions) => {
fail!(from self, with WaitSetRunError::InsufficientPermissions,
"{msg} due to insufficient permissions.");
}
Err(ReactorWaitError::UnknownError) => {
fail!(from self, with WaitSetRunError::InternalError,
"{msg} due to an internal error.");
}
}
}
pub fn capacity(&self) -> usize {
self.reactor.capacity()
}
pub fn len(&self) -> usize {
self.attachment_counter.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn signal_handling_mode(&self) -> SignalHandlingMode {
self.signal_handling_mode
}
fn attach_to_reactor<'waitset, 'attachment, T: SynchronousMultiplexing + Debug>(
&'waitset self,
attachment: &'attachment T,
) -> Result<<Service::Reactor as Reactor>::Guard<'waitset, 'attachment>, WaitSetAttachmentError>
{
let msg = "Unable to attach object to internal reactor";
match self.reactor.attach(attachment) {
Ok(guard) => Ok(guard),
Err(ReactorAttachError::AlreadyAttached) => {
fail!(from self, with WaitSetAttachmentError::AlreadyAttached,
"{msg} {:?} since it is already attached.", attachment);
}
Err(ReactorAttachError::CapacityExceeded) => {
fail!(from self, with WaitSetAttachmentError::AlreadyAttached,
"{msg} {:?} since it would exceed the capacity of {} of the waitset.",
attachment, self.capacity());
}
Err(ReactorAttachError::UnknownError(e)) => {
fail!(from self, with WaitSetAttachmentError::InternalError,
"{msg} {:?} due to an internal error (error code = {})", attachment, e);
}
}
}
fn attach_to_deadline_queue(
&self,
timeout: Duration,
) -> Result<DeadlineQueueGuard<'_>, WaitSetAttachmentError> {
let msg = "Unable to attach timeout to underlying Timer";
match self.deadline_queue.add_deadline_interval(timeout) {
Ok(guard) => Ok(guard),
Err(e) => {
fail!(from self, with WaitSetAttachmentError::InternalError,
"{msg} since the timeout could not be attached to the underlying deadline_queue due to ({:?}).", e);
}
}
}
}