Skip to main content

taktora_executor/
service.rs

1//! `Service<Req, Resp>` — iceoryx2 request/response paired with two event
2//! services (one each for request- and response-available wakeups).
3
4use crate::error::ExecutorError;
5use crate::payload::Payload;
6use core::marker::PhantomData;
7use iceoryx2::node::Node;
8use iceoryx2::port::client::Client as IxClient;
9use iceoryx2::port::listener::Listener as IxListener;
10use iceoryx2::port::notifier::Notifier as IxNotifier;
11use iceoryx2::port::server::Server as IxServer;
12use iceoryx2::prelude::*;
13use iceoryx2::response::Response as IxResponse;
14use std::sync::Arc;
15
16type IpcService = ipc::Service;
17
18/// Suffix appended to a service name to form the request-available event service name.
19pub const REQ_EVENT_SUFFIX: &str = ".__taktora_req_event";
20
21/// Suffix appended to a service name to form the response-available event service name.
22pub const RESP_EVENT_SUFFIX: &str = ".__taktora_resp_event";
23
24/// Request/response service with two paired event services for wakeup.
25pub struct Service<Req, Resp>
26where
27    Req: Payload,
28    Resp: Payload,
29{
30    rr: iceoryx2::service::port_factory::request_response::PortFactory<
31        IpcService,
32        Req,
33        (),
34        Resp,
35        (),
36    >,
37    req_event: iceoryx2::service::port_factory::event::PortFactory<IpcService>,
38    resp_event: iceoryx2::service::port_factory::event::PortFactory<IpcService>,
39    _marker: PhantomData<(Req, Resp)>,
40}
41
42impl<Req, Resp> Service<Req, Resp>
43where
44    Req: Payload,
45    Resp: Payload,
46{
47    /// Open or create the service by name, creating the two paired event services.
48    pub fn open_or_create(
49        node: &Node<IpcService>,
50        topic: &str,
51    ) -> Result<Arc<Self>, ExecutorError> {
52        let rr_name = topic
53            .try_into()
54            .map_err(|e| ExecutorError::Builder(format!("invalid service name: {e:?}")))?;
55        let rr = node
56            .service_builder(&rr_name)
57            .request_response::<Req, Resp>()
58            .open_or_create()
59            .map_err(ExecutorError::iceoryx2)?;
60
61        let make_event = |suffix: &str| -> Result<_, ExecutorError> {
62            let n = format!("{topic}{suffix}");
63            let n = n
64                .as_str()
65                .try_into()
66                .map_err(|e| ExecutorError::Builder(format!("invalid event-topic name: {e:?}")))?;
67            node.service_builder(&n)
68                .event()
69                .open_or_create()
70                .map_err(ExecutorError::iceoryx2)
71        };
72        let req_event = make_event(REQ_EVENT_SUFFIX)?;
73        let resp_event = make_event(RESP_EVENT_SUFFIX)?;
74
75        Ok(Arc::new(Self {
76            rr,
77            req_event,
78            resp_event,
79            _marker: PhantomData,
80        }))
81    }
82
83    /// Create a new `Server` that listens for requests on this service.
84    pub fn server(self: &Arc<Self>) -> Result<Server<Req, Resp>, ExecutorError> {
85        let inner = self
86            .rr
87            .server_builder()
88            .create()
89            .map_err(ExecutorError::iceoryx2)?;
90        let listener = self
91            .req_event
92            .listener_builder()
93            .create()
94            .map_err(ExecutorError::iceoryx2)?;
95        let resp_notifier = self
96            .resp_event
97            .notifier_builder()
98            .create()
99            .map_err(ExecutorError::iceoryx2)?;
100        // SAFETY: see `impl Send for Server<Req, Resp>` below.
101        #[allow(clippy::arc_with_non_send_sync)]
102        let listener = Arc::new(listener);
103        Ok(Server {
104            inner,
105            listener,
106            resp_notifier,
107            _service: Arc::clone(self),
108        })
109    }
110
111    /// Create a new `Client` that sends requests on this service.
112    pub fn client(self: &Arc<Self>) -> Result<Client<Req, Resp>, ExecutorError> {
113        let inner = self
114            .rr
115            .client_builder()
116            .create()
117            .map_err(ExecutorError::iceoryx2)?;
118        let listener = self
119            .resp_event
120            .listener_builder()
121            .create()
122            .map_err(ExecutorError::iceoryx2)?;
123        let req_notifier = self
124            .req_event
125            .notifier_builder()
126            .create()
127            .map_err(ExecutorError::iceoryx2)?;
128        // SAFETY: see `impl Send for Client<Req, Resp>` below.
129        #[allow(clippy::arc_with_non_send_sync)]
130        let listener = Arc::new(listener);
131        Ok(Client {
132            inner,
133            listener,
134            req_notifier,
135            _service: Arc::clone(self),
136        })
137    }
138}
139
140/// Server side of a `Service<Req, Resp>`. Receives requests and sends responses.
141pub struct Server<Req, Resp>
142where
143    Req: Payload,
144    Resp: Payload,
145{
146    inner: IxServer<IpcService, Req, (), Resp, ()>,
147    listener: Arc<IxListener<IpcService>>,
148    resp_notifier: IxNotifier<IpcService>,
149    _service: Arc<Service<Req, Resp>>,
150}
151
152// SAFETY: `IxServer<ipc::Service, …>` is `!Send` because
153// `ipc::Service::ArcThreadSafetyPolicy` is `SingleThreaded`, which wraps an
154// `Rc`-like interior.  The Rc is only mutated during port creation (constructor)
155// and during `update_connections` (called inside `receive()`).  After
156// construction, the executor only calls:
157//   * `server.receive()` — drives `update_connections()` + shared-memory read
158//   * `server.listener_handle()` — cheap `Arc::clone` of our own Arc
159// No two threads concurrently touch the Rc, so moving a `Server` is sound.
160// We do not implement `Sync`; the struct is move-only across threads.
161#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
162unsafe impl<Req, Resp> Send for Server<Req, Resp>
163where
164    Req: Payload,
165    Resp: Payload,
166{
167}
168
169impl<Req, Resp> Server<Req, Resp>
170where
171    Req: Payload + Copy,
172    Resp: Payload + Copy,
173{
174    /// Take the next pending request, if any.
175    ///
176    /// Returns `(payload_copy, ActiveRequest)`. Use the `ActiveRequest` to
177    /// respond via `respond_copy`.
178    #[allow(clippy::type_complexity, clippy::option_if_let_else)]
179    pub fn take_request(
180        &self,
181    ) -> Result<Option<(Req, ActiveRequest<'_, Req, Resp>)>, ExecutorError> {
182        match self.inner.receive().map_err(ExecutorError::iceoryx2)? {
183            None => Ok(None),
184            Some(active) => {
185                let req = *active;
186                Ok(Some((
187                    req,
188                    ActiveRequest {
189                        active,
190                        server: self,
191                    },
192                )))
193            }
194        }
195    }
196
197    /// Borrow the request-event listener (executor uses this for trigger attachment).
198    pub fn listener_handle(&self) -> Arc<IxListener<IpcService>> {
199        Arc::clone(&self.listener)
200    }
201}
202
203/// A received request, used to send the response.
204pub struct ActiveRequest<'s, Req, Resp>
205where
206    Req: Payload,
207    Resp: Payload,
208{
209    active: iceoryx2::active_request::ActiveRequest<IpcService, Req, (), Resp, ()>,
210    server: &'s Server<Req, Resp>,
211}
212
213impl<Req, Resp> ActiveRequest<'_, Req, Resp>
214where
215    Req: Payload + Copy,
216    Resp: Payload + Copy,
217{
218    /// Send a response by value and notify the client's listener.
219    pub fn respond_copy(self, resp: Resp) -> Result<(), ExecutorError> {
220        let sample = self.active.loan_uninit().map_err(ExecutorError::iceoryx2)?;
221        let sample = sample.write_payload(resp);
222        sample.send().map_err(ExecutorError::iceoryx2)?;
223        self.server
224            .resp_notifier
225            .notify()
226            .map_err(ExecutorError::iceoryx2)?;
227        Ok(())
228    }
229}
230
231/// Client side of a `Service<Req, Resp>`. Sends requests and receives responses.
232pub struct Client<Req, Resp>
233where
234    Req: Payload,
235    Resp: Payload,
236{
237    inner: IxClient<IpcService, Req, (), Resp, ()>,
238    listener: Arc<IxListener<IpcService>>,
239    req_notifier: IxNotifier<IpcService>,
240    _service: Arc<Service<Req, Resp>>,
241}
242
243// SAFETY: same rationale as `Server<Req, Resp>` above.
244// `IxClient<ipc::Service, …>` is `!Send` because `SingleThreaded` holds an Rc.
245// After construction, only `send_copy` and `listener_handle` are called.
246// `send_copy` does not touch the Rc concurrently; `listener_handle` is an
247// `Arc::clone`. No concurrent Rc mutation, so moving a `Client` is sound.
248// We do not implement `Sync`.
249#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
250unsafe impl<Req, Resp> Send for Client<Req, Resp>
251where
252    Req: Payload,
253    Resp: Payload,
254{
255}
256
257impl<Req, Resp> Client<Req, Resp>
258where
259    Req: Payload + Copy,
260    Resp: Payload + Copy,
261{
262    /// Send a request by value. Returns a `PendingRequest` handle for receiving
263    /// the response(s), and notifies the server's listener.
264    pub fn send_copy(&self, req: Req) -> Result<PendingRequest<Req, Resp>, ExecutorError> {
265        let pending = self.inner.send_copy(req).map_err(ExecutorError::iceoryx2)?;
266        self.req_notifier
267            .notify()
268            .map_err(ExecutorError::iceoryx2)?;
269        Ok(PendingRequest { inner: pending })
270    }
271
272    /// Borrow the response-event listener (executor uses this for trigger attachment).
273    pub fn listener_handle(&self) -> Arc<IxListener<IpcService>> {
274        Arc::clone(&self.listener)
275    }
276}
277
278/// Handle to an in-flight request — receives the matching response(s).
279pub struct PendingRequest<Req, Resp>
280where
281    Req: Payload,
282    Resp: Payload,
283{
284    inner: iceoryx2::pending_response::PendingResponse<IpcService, Req, (), Resp, ()>,
285}
286
287// SAFETY: `PendingResponse<ipc::Service, …>` is `!Send` for the same
288// `SingleThreaded` Rc reason.  After construction, only `receive()` is
289// called (shared-memory read path, no concurrent Rc mutation).
290// Move-only across threads; no `Sync`.
291#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
292unsafe impl<Req, Resp> Send for PendingRequest<Req, Resp>
293where
294    Req: Payload,
295    Resp: Payload,
296{
297}
298
299impl<Req, Resp> PendingRequest<Req, Resp>
300where
301    Req: Payload + Copy,
302    Resp: Payload + Copy,
303{
304    /// Try to receive the next response, if one has arrived.
305    ///
306    /// The iceoryx2 0.8.1 `PendingResponse::receive()` returns a
307    /// `Response<IpcService, Resp, ()>`, not a `Sample` — this wraps it.
308    pub fn take(&self) -> Result<Option<IxResponse<IpcService, Resp, ()>>, ExecutorError> {
309        self.inner.receive().map_err(ExecutorError::iceoryx2)
310    }
311}