use crate::error::ExecutorError;
use crate::payload::Payload;
use core::marker::PhantomData;
use iceoryx2::node::Node;
use iceoryx2::port::client::Client as IxClient;
use iceoryx2::port::listener::Listener as IxListener;
use iceoryx2::port::notifier::Notifier as IxNotifier;
use iceoryx2::port::server::Server as IxServer;
use iceoryx2::prelude::*;
use iceoryx2::response::Response as IxResponse;
use std::sync::Arc;
type IpcService = ipc::Service;
pub const REQ_EVENT_SUFFIX: &str = ".__taktora_req_event";
pub const RESP_EVENT_SUFFIX: &str = ".__taktora_resp_event";
pub struct Service<Req, Resp>
where
Req: Payload,
Resp: Payload,
{
rr: iceoryx2::service::port_factory::request_response::PortFactory<
IpcService,
Req,
(),
Resp,
(),
>,
req_event: iceoryx2::service::port_factory::event::PortFactory<IpcService>,
resp_event: iceoryx2::service::port_factory::event::PortFactory<IpcService>,
_marker: PhantomData<(Req, Resp)>,
}
impl<Req, Resp> Service<Req, Resp>
where
Req: Payload,
Resp: Payload,
{
pub fn open_or_create(
node: &Node<IpcService>,
topic: &str,
) -> Result<Arc<Self>, ExecutorError> {
let rr_name = topic
.try_into()
.map_err(|e| ExecutorError::Builder(format!("invalid service name: {e:?}")))?;
let rr = node
.service_builder(&rr_name)
.request_response::<Req, Resp>()
.open_or_create()
.map_err(ExecutorError::iceoryx2)?;
let make_event = |suffix: &str| -> Result<_, ExecutorError> {
let n = format!("{topic}{suffix}");
let n = n
.as_str()
.try_into()
.map_err(|e| ExecutorError::Builder(format!("invalid event-topic name: {e:?}")))?;
node.service_builder(&n)
.event()
.open_or_create()
.map_err(ExecutorError::iceoryx2)
};
let req_event = make_event(REQ_EVENT_SUFFIX)?;
let resp_event = make_event(RESP_EVENT_SUFFIX)?;
Ok(Arc::new(Self {
rr,
req_event,
resp_event,
_marker: PhantomData,
}))
}
pub fn server(self: &Arc<Self>) -> Result<Server<Req, Resp>, ExecutorError> {
let inner = self
.rr
.server_builder()
.create()
.map_err(ExecutorError::iceoryx2)?;
let listener = self
.req_event
.listener_builder()
.create()
.map_err(ExecutorError::iceoryx2)?;
let resp_notifier = self
.resp_event
.notifier_builder()
.create()
.map_err(ExecutorError::iceoryx2)?;
#[allow(clippy::arc_with_non_send_sync)]
let listener = Arc::new(listener);
Ok(Server {
inner,
listener,
resp_notifier,
_service: Arc::clone(self),
})
}
pub fn client(self: &Arc<Self>) -> Result<Client<Req, Resp>, ExecutorError> {
let inner = self
.rr
.client_builder()
.create()
.map_err(ExecutorError::iceoryx2)?;
let listener = self
.resp_event
.listener_builder()
.create()
.map_err(ExecutorError::iceoryx2)?;
let req_notifier = self
.req_event
.notifier_builder()
.create()
.map_err(ExecutorError::iceoryx2)?;
#[allow(clippy::arc_with_non_send_sync)]
let listener = Arc::new(listener);
Ok(Client {
inner,
listener,
req_notifier,
_service: Arc::clone(self),
})
}
}
pub struct Server<Req, Resp>
where
Req: Payload,
Resp: Payload,
{
inner: IxServer<IpcService, Req, (), Resp, ()>,
listener: Arc<IxListener<IpcService>>,
resp_notifier: IxNotifier<IpcService>,
_service: Arc<Service<Req, Resp>>,
}
#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
unsafe impl<Req, Resp> Send for Server<Req, Resp>
where
Req: Payload,
Resp: Payload,
{
}
impl<Req, Resp> Server<Req, Resp>
where
Req: Payload + Copy,
Resp: Payload + Copy,
{
#[allow(clippy::type_complexity, clippy::option_if_let_else)]
pub fn take_request(
&self,
) -> Result<Option<(Req, ActiveRequest<'_, Req, Resp>)>, ExecutorError> {
match self.inner.receive().map_err(ExecutorError::iceoryx2)? {
None => Ok(None),
Some(active) => {
let req = *active;
Ok(Some((
req,
ActiveRequest {
active,
server: self,
},
)))
}
}
}
pub fn listener_handle(&self) -> Arc<IxListener<IpcService>> {
Arc::clone(&self.listener)
}
}
pub struct ActiveRequest<'s, Req, Resp>
where
Req: Payload,
Resp: Payload,
{
active: iceoryx2::active_request::ActiveRequest<IpcService, Req, (), Resp, ()>,
server: &'s Server<Req, Resp>,
}
impl<Req, Resp> ActiveRequest<'_, Req, Resp>
where
Req: Payload + Copy,
Resp: Payload + Copy,
{
pub fn respond_copy(self, resp: Resp) -> Result<(), ExecutorError> {
let sample = self.active.loan_uninit().map_err(ExecutorError::iceoryx2)?;
let sample = sample.write_payload(resp);
sample.send().map_err(ExecutorError::iceoryx2)?;
self.server
.resp_notifier
.notify()
.map_err(ExecutorError::iceoryx2)?;
Ok(())
}
}
pub struct Client<Req, Resp>
where
Req: Payload,
Resp: Payload,
{
inner: IxClient<IpcService, Req, (), Resp, ()>,
listener: Arc<IxListener<IpcService>>,
req_notifier: IxNotifier<IpcService>,
_service: Arc<Service<Req, Resp>>,
}
#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
unsafe impl<Req, Resp> Send for Client<Req, Resp>
where
Req: Payload,
Resp: Payload,
{
}
impl<Req, Resp> Client<Req, Resp>
where
Req: Payload + Copy,
Resp: Payload + Copy,
{
pub fn send_copy(&self, req: Req) -> Result<PendingRequest<Req, Resp>, ExecutorError> {
let pending = self.inner.send_copy(req).map_err(ExecutorError::iceoryx2)?;
self.req_notifier
.notify()
.map_err(ExecutorError::iceoryx2)?;
Ok(PendingRequest { inner: pending })
}
pub fn listener_handle(&self) -> Arc<IxListener<IpcService>> {
Arc::clone(&self.listener)
}
}
pub struct PendingRequest<Req, Resp>
where
Req: Payload,
Resp: Payload,
{
inner: iceoryx2::pending_response::PendingResponse<IpcService, Req, (), Resp, ()>,
}
#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
unsafe impl<Req, Resp> Send for PendingRequest<Req, Resp>
where
Req: Payload,
Resp: Payload,
{
}
impl<Req, Resp> PendingRequest<Req, Resp>
where
Req: Payload + Copy,
Resp: Payload + Copy,
{
pub fn take(&self) -> Result<Option<IxResponse<IpcService, Resp, ()>>, ExecutorError> {
self.inner.receive().map_err(ExecutorError::iceoryx2)
}
}