1use crate::error::ExecutorError;
5use crate::payload::Payload;
6use core::marker::PhantomData;
7use iceoryx2::node::Node;
8use iceoryx2::port::client::Client as IxClient;
9use iceoryx2::port::listener::Listener as IxListener;
10use iceoryx2::port::notifier::Notifier as IxNotifier;
11use iceoryx2::port::server::Server as IxServer;
12use iceoryx2::prelude::*;
13use iceoryx2::response::Response as IxResponse;
14use std::sync::Arc;
15
16type IpcService = ipc::Service;
17
18pub const REQ_EVENT_SUFFIX: &str = ".__taktora_req_event";
20
21pub const RESP_EVENT_SUFFIX: &str = ".__taktora_resp_event";
23
24pub struct Service<Req, Resp>
26where
27 Req: Payload,
28 Resp: Payload,
29{
30 rr: iceoryx2::service::port_factory::request_response::PortFactory<
31 IpcService,
32 Req,
33 (),
34 Resp,
35 (),
36 >,
37 req_event: iceoryx2::service::port_factory::event::PortFactory<IpcService>,
38 resp_event: iceoryx2::service::port_factory::event::PortFactory<IpcService>,
39 _marker: PhantomData<(Req, Resp)>,
40}
41
42impl<Req, Resp> Service<Req, Resp>
43where
44 Req: Payload,
45 Resp: Payload,
46{
47 pub fn open_or_create(
49 node: &Node<IpcService>,
50 topic: &str,
51 ) -> Result<Arc<Self>, ExecutorError> {
52 let rr_name = topic
53 .try_into()
54 .map_err(|e| ExecutorError::Builder(format!("invalid service name: {e:?}")))?;
55 let rr = node
56 .service_builder(&rr_name)
57 .request_response::<Req, Resp>()
58 .open_or_create()
59 .map_err(ExecutorError::iceoryx2)?;
60
61 let make_event = |suffix: &str| -> Result<_, ExecutorError> {
62 let n = format!("{topic}{suffix}");
63 let n = n
64 .as_str()
65 .try_into()
66 .map_err(|e| ExecutorError::Builder(format!("invalid event-topic name: {e:?}")))?;
67 node.service_builder(&n)
68 .event()
69 .open_or_create()
70 .map_err(ExecutorError::iceoryx2)
71 };
72 let req_event = make_event(REQ_EVENT_SUFFIX)?;
73 let resp_event = make_event(RESP_EVENT_SUFFIX)?;
74
75 Ok(Arc::new(Self {
76 rr,
77 req_event,
78 resp_event,
79 _marker: PhantomData,
80 }))
81 }
82
83 pub fn server(self: &Arc<Self>) -> Result<Server<Req, Resp>, ExecutorError> {
85 let inner = self
86 .rr
87 .server_builder()
88 .create()
89 .map_err(ExecutorError::iceoryx2)?;
90 let listener = self
91 .req_event
92 .listener_builder()
93 .create()
94 .map_err(ExecutorError::iceoryx2)?;
95 let resp_notifier = self
96 .resp_event
97 .notifier_builder()
98 .create()
99 .map_err(ExecutorError::iceoryx2)?;
100 #[allow(clippy::arc_with_non_send_sync)]
102 let listener = Arc::new(listener);
103 Ok(Server {
104 inner,
105 listener,
106 resp_notifier,
107 _service: Arc::clone(self),
108 })
109 }
110
111 pub fn client(self: &Arc<Self>) -> Result<Client<Req, Resp>, ExecutorError> {
113 let inner = self
114 .rr
115 .client_builder()
116 .create()
117 .map_err(ExecutorError::iceoryx2)?;
118 let listener = self
119 .resp_event
120 .listener_builder()
121 .create()
122 .map_err(ExecutorError::iceoryx2)?;
123 let req_notifier = self
124 .req_event
125 .notifier_builder()
126 .create()
127 .map_err(ExecutorError::iceoryx2)?;
128 #[allow(clippy::arc_with_non_send_sync)]
130 let listener = Arc::new(listener);
131 Ok(Client {
132 inner,
133 listener,
134 req_notifier,
135 _service: Arc::clone(self),
136 })
137 }
138}
139
140pub struct Server<Req, Resp>
142where
143 Req: Payload,
144 Resp: Payload,
145{
146 inner: IxServer<IpcService, Req, (), Resp, ()>,
147 listener: Arc<IxListener<IpcService>>,
148 resp_notifier: IxNotifier<IpcService>,
149 _service: Arc<Service<Req, Resp>>,
150}
151
152#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
162unsafe impl<Req, Resp> Send for Server<Req, Resp>
163where
164 Req: Payload,
165 Resp: Payload,
166{
167}
168
169impl<Req, Resp> Server<Req, Resp>
170where
171 Req: Payload + Copy,
172 Resp: Payload + Copy,
173{
174 #[allow(clippy::type_complexity, clippy::option_if_let_else)]
179 pub fn take_request(
180 &self,
181 ) -> Result<Option<(Req, ActiveRequest<'_, Req, Resp>)>, ExecutorError> {
182 match self.inner.receive().map_err(ExecutorError::iceoryx2)? {
183 None => Ok(None),
184 Some(active) => {
185 let req = *active;
186 Ok(Some((
187 req,
188 ActiveRequest {
189 active,
190 server: self,
191 },
192 )))
193 }
194 }
195 }
196
197 pub fn listener_handle(&self) -> Arc<IxListener<IpcService>> {
199 Arc::clone(&self.listener)
200 }
201}
202
203pub struct ActiveRequest<'s, Req, Resp>
205where
206 Req: Payload,
207 Resp: Payload,
208{
209 active: iceoryx2::active_request::ActiveRequest<IpcService, Req, (), Resp, ()>,
210 server: &'s Server<Req, Resp>,
211}
212
213impl<Req, Resp> ActiveRequest<'_, Req, Resp>
214where
215 Req: Payload + Copy,
216 Resp: Payload + Copy,
217{
218 pub fn respond_copy(self, resp: Resp) -> Result<(), ExecutorError> {
220 let sample = self.active.loan_uninit().map_err(ExecutorError::iceoryx2)?;
221 let sample = sample.write_payload(resp);
222 sample.send().map_err(ExecutorError::iceoryx2)?;
223 self.server
224 .resp_notifier
225 .notify()
226 .map_err(ExecutorError::iceoryx2)?;
227 Ok(())
228 }
229}
230
231pub struct Client<Req, Resp>
233where
234 Req: Payload,
235 Resp: Payload,
236{
237 inner: IxClient<IpcService, Req, (), Resp, ()>,
238 listener: Arc<IxListener<IpcService>>,
239 req_notifier: IxNotifier<IpcService>,
240 _service: Arc<Service<Req, Resp>>,
241}
242
243#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
250unsafe impl<Req, Resp> Send for Client<Req, Resp>
251where
252 Req: Payload,
253 Resp: Payload,
254{
255}
256
257impl<Req, Resp> Client<Req, Resp>
258where
259 Req: Payload + Copy,
260 Resp: Payload + Copy,
261{
262 pub fn send_copy(&self, req: Req) -> Result<PendingRequest<Req, Resp>, ExecutorError> {
265 let pending = self.inner.send_copy(req).map_err(ExecutorError::iceoryx2)?;
266 self.req_notifier
267 .notify()
268 .map_err(ExecutorError::iceoryx2)?;
269 Ok(PendingRequest { inner: pending })
270 }
271
272 pub fn listener_handle(&self) -> Arc<IxListener<IpcService>> {
274 Arc::clone(&self.listener)
275 }
276}
277
278pub struct PendingRequest<Req, Resp>
280where
281 Req: Payload,
282 Resp: Payload,
283{
284 inner: iceoryx2::pending_response::PendingResponse<IpcService, Req, (), Resp, ()>,
285}
286
287#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
292unsafe impl<Req, Resp> Send for PendingRequest<Req, Resp>
293where
294 Req: Payload,
295 Resp: Payload,
296{
297}
298
299impl<Req, Resp> PendingRequest<Req, Resp>
300where
301 Req: Payload + Copy,
302 Resp: Payload + Copy,
303{
304 pub fn take(&self) -> Result<Option<IxResponse<IpcService, Resp, ()>>, ExecutorError> {
309 self.inner.receive().map_err(ExecutorError::iceoryx2)
310 }
311}