use crate::service_discovery::{SyncError, Tracker};
use iceoryx2::port::ReceiveError;
use iceoryx2::prelude::{AllocationStrategy, ZeroCopySend};
use iceoryx2::{
config::Config as IceoryxConfig,
node::{Node, NodeBuilder, NodeCreationFailure},
port::{
LoanError, SendError,
notifier::{Notifier, NotifierCreateError, NotifierNotifyError},
publisher::{Publisher, PublisherCreateError},
server::Server,
},
prelude::ServiceName,
service::{
Service as ServiceType, ServiceDetails,
builder::{
event::EventOpenOrCreateError, publish_subscribe::PublishSubscribeOpenOrCreateError,
},
port_factory::request_response::PortFactory,
static_config::StaticConfig,
},
};
use iceoryx2_bb_concurrency::lazy_lock::LazyLock;
const SERVICE_NAME: &str = "discovery/services/";
#[derive(Debug, ZeroCopySend, serde::Serialize, serde::Deserialize)]
#[allow(dead_code)] #[repr(C)]
pub enum Discovery {
Added(StaticConfig),
Removed(StaticConfig),
}
pub type Payload = Discovery;
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub enum CreationError {
InsufficientPermissions,
NodeCreationFailure,
ServiceCreationFailure,
SyncFailure,
PublisherCreationError,
PublisherAlreadyExists,
NotifierAlreadyExists,
}
impl core::fmt::Display for CreationError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "CreationError::{self:?}")
}
}
impl core::error::Error for CreationError {}
impl From<NodeCreationFailure> for CreationError {
fn from(_: NodeCreationFailure) -> Self {
CreationError::NodeCreationFailure
}
}
impl From<PublishSubscribeOpenOrCreateError> for CreationError {
fn from(_: PublishSubscribeOpenOrCreateError) -> Self {
CreationError::ServiceCreationFailure
}
}
impl From<PublisherCreateError> for CreationError {
fn from(error: PublisherCreateError) -> Self {
match error {
PublisherCreateError::ExceedsMaxSupportedPublishers => {
CreationError::PublisherAlreadyExists
}
PublisherCreateError::UnableToCreateDataSegment
| PublisherCreateError::FailedToDeployThreadsafetyPolicy
| PublisherCreateError::UnableToCreatePortTag => CreationError::PublisherCreationError,
}
}
}
impl From<EventOpenOrCreateError> for CreationError {
fn from(_: EventOpenOrCreateError) -> Self {
CreationError::ServiceCreationFailure
}
}
impl From<NotifierCreateError> for CreationError {
fn from(_: NotifierCreateError) -> Self {
CreationError::NotifierAlreadyExists
}
}
impl From<SyncError> for CreationError {
fn from(error: SyncError) -> Self {
match error {
SyncError::InsufficientPermissions => CreationError::InsufficientPermissions,
SyncError::ServiceLookupFailure => CreationError::SyncFailure,
}
}
}
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub enum SpinError {
InsufficientPermissions,
SyncFailure,
PublishFailure,
NotifyFailure,
ServerSpinError(ServerSpinError),
}
impl core::fmt::Display for SpinError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "SpinError::{self:?}")
}
}
impl core::error::Error for SpinError {}
impl From<SyncError> for SpinError {
fn from(error: SyncError) -> Self {
match error {
SyncError::InsufficientPermissions => SpinError::InsufficientPermissions,
SyncError::ServiceLookupFailure => SpinError::SyncFailure,
}
}
}
impl From<LoanError> for SpinError {
fn from(_: LoanError) -> Self {
SpinError::PublishFailure
}
}
impl From<SendError> for SpinError {
fn from(_: SendError) -> Self {
SpinError::PublishFailure
}
}
impl From<NotifierNotifyError> for SpinError {
fn from(_: NotifierNotifyError) -> Self {
SpinError::NotifyFailure
}
}
impl From<ServerSpinError> for SpinError {
fn from(error: ServerSpinError) -> Self {
SpinError::ServerSpinError(error)
}
}
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub enum ServerSpinError {
ReceptionFailure,
ResponseSendFailure,
LoanFailure,
}
impl core::fmt::Display for ServerSpinError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "ServerSpinError::{self:?}")
}
}
impl core::error::Error for ServerSpinError {}
impl From<ReceiveError> for ServerSpinError {
fn from(_: ReceiveError) -> Self {
ServerSpinError::ReceptionFailure
}
}
impl From<SendError> for ServerSpinError {
fn from(_: SendError) -> Self {
ServerSpinError::ResponseSendFailure
}
}
impl From<LoanError> for ServerSpinError {
fn from(_: LoanError) -> Self {
ServerSpinError::LoanFailure
}
}
#[derive(Debug, Clone)]
pub struct Config {
pub sync_on_initialization: bool,
pub include_internal: bool,
pub publish_events: bool,
pub max_subscribers: usize,
pub max_buffer_size: usize,
pub max_borrrowed_samples: usize,
pub history_size: usize,
pub send_notifications: bool,
pub max_listeners: usize,
pub enable_server: bool,
pub initial_max_slice_len: usize,
}
impl Default for Config {
fn default() -> Self {
let defaults = iceoryx2::config::Config::default().defaults;
Self {
sync_on_initialization: true,
include_internal: true,
publish_events: true,
history_size: defaults.publish_subscribe.publisher_history_size,
max_subscribers: defaults.publish_subscribe.max_subscribers,
max_buffer_size: defaults.publish_subscribe.subscriber_max_buffer_size,
max_borrrowed_samples: defaults.publish_subscribe.subscriber_max_borrowed_samples,
send_notifications: true,
max_listeners: defaults.event.max_listeners,
enable_server: true,
initial_max_slice_len: 10,
}
}
}
#[allow(dead_code)]
#[derive(Debug)]
pub struct Service<S: ServiceType> {
discovery_config: Config,
iceoryx_config: IceoryxConfig,
_node: Node<S>,
publisher: Option<Publisher<S, Payload, ()>>,
request_response: Option<PortFactory<S, (), (), [StaticConfig], ()>>,
server: Option<Server<S, (), (), [StaticConfig], ()>>,
notifier: Option<Notifier<S>>,
tracker: Tracker<S>,
}
impl<S: ServiceType> Service<S> {
pub fn create(
discovery_config: &Config,
iceoryx_config: &IceoryxConfig,
) -> Result<Self, CreationError> {
let node = NodeBuilder::new().config(iceoryx_config).create::<S>()?;
let mut publisher = None;
if discovery_config.publish_events {
let publish_subscribe = node
.service_builder(service_name())
.publish_subscribe::<Payload>()
.subscriber_max_buffer_size(discovery_config.max_buffer_size)
.subscriber_max_borrowed_samples(discovery_config.max_borrrowed_samples)
.history_size(discovery_config.history_size)
.max_subscribers(discovery_config.max_subscribers)
.max_publishers(1)
.open_or_create()?;
publisher = Some(publish_subscribe.publisher_builder().create()?);
}
let mut notifier = None;
if discovery_config.send_notifications {
let event = node
.service_builder(service_name())
.event()
.max_listeners(discovery_config.max_listeners)
.max_notifiers(1)
.open_or_create()?;
let port = event.notifier_builder().create()?;
notifier = Some(port);
}
let mut tracker = Tracker::<S>::new(iceoryx_config);
if discovery_config.sync_on_initialization {
tracker.sync()?;
}
let (request_response, server) = if discovery_config.enable_server {
let request_response = node
.service_builder(service_name())
.request_response::<(), [StaticConfig]>()
.open_or_create()
.map_err(|_| CreationError::ServiceCreationFailure)?;
let server = Some(
request_response
.server_builder()
.initial_max_slice_len(discovery_config.initial_max_slice_len)
.allocation_strategy(AllocationStrategy::PowerOfTwo)
.create()
.map_err(|_| CreationError::ServiceCreationFailure)?,
);
(Some(request_response), server)
} else {
(None, None)
};
Ok(Service::<S> {
discovery_config: discovery_config.clone(),
iceoryx_config: iceoryx_config.clone(),
_node: node,
publisher,
request_response,
server,
notifier,
tracker,
})
}
pub fn spin<
FAddedService: FnMut(&ServiceDetails<S>),
FRemovedService: FnMut(&ServiceDetails<S>),
>(
&mut self,
mut on_added: FAddedService,
mut on_removed: FRemovedService,
) -> Result<(), SpinError> {
let (added_ids, removed_services) = self.tracker.sync()?;
let changes_detected = !added_ids.is_empty() || !removed_services.is_empty();
for id in &added_ids {
if let Some(service) = self.tracker.get(id) {
if !self.discovery_config.include_internal
&& ServiceName::has_iox2_prefix(service.static_details.name())
{
continue;
}
if let Some(publisher) = &self.publisher {
let sample = publisher.loan_uninit()?;
let sample =
sample.write_payload(Discovery::Added(service.static_details.clone()));
sample.send()?;
}
on_added(service);
}
}
for service in &removed_services {
if !self.discovery_config.include_internal
&& ServiceName::has_iox2_prefix(service.static_details.name())
{
continue;
}
if let Some(publisher) = &self.publisher {
let sample = publisher.loan_uninit()?;
let sample =
sample.write_payload(Discovery::Removed(service.static_details.clone()));
sample.send()?;
}
on_removed(service);
}
if let Some(notifier) = &mut self.notifier {
if changes_detected {
notifier.notify()?;
}
}
self.handle_discovery_requests()?;
Ok(())
}
pub fn handle_discovery_requests(&mut self) -> Result<(), ServerSpinError> {
if let Some(server) = &mut self.server {
while let Some(active_request) = server.receive()? {
let mut service_details = self.tracker.get_all();
if !self.discovery_config.include_internal {
service_details.retain(|&service| {
!ServiceName::has_iox2_prefix(service.static_details.name())
});
}
let response = active_request.loan_slice_uninit(service_details.len())?;
let response =
response.write_from_fn(|idx| (service_details[idx].static_details).clone());
response.send()?;
}
}
Ok(())
}
}
pub fn service_name() -> &'static ServiceName {
static SERVICE_NAME_INSTANCE: LazyLock<ServiceName> = LazyLock::new(|| {
ServiceName::__internal_new_prefixed(SERVICE_NAME)
.expect("shouldn't occur: invalid service name for service discovery service")
});
&SERVICE_NAME_INSTANCE
}