Skip to main content

iceoryx2_cal/event/
process_local_socketpair.rs

1// Copyright (c) 2025 Contributors to the Eclipse Foundation
2//
3// See the NOTICE file(s) distributed with this work for additional
4// information regarding copyright ownership.
5//
6// This program and the accompanying materials are made available under the
7// terms of the Apache Software License 2.0 which is available at
8// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
9// which is available at https://opensource.org/licenses/MIT.
10//
11// SPDX-License-Identifier: Apache-2.0 OR MIT
12
13use core::ptr::NonNull;
14use core::time::Duration;
15
16use alloc::collections::BTreeMap;
17use alloc::vec;
18use alloc::vec::Vec;
19
20pub use iceoryx2_bb_container::semantic_string::SemanticString;
21pub use iceoryx2_bb_system_types::{file_name::FileName, file_path::FilePath, path::Path};
22
23use iceoryx2_bb_concurrency::lazy_lock::LazyLock;
24use iceoryx2_bb_elementary_traits::testing::abandonable::Abandonable;
25use iceoryx2_bb_posix::{
26    file_descriptor::FileDescriptorBased,
27    file_descriptor_set::SynchronousMultiplexing,
28    mutex::{Handle, Mutex, MutexBuilder, MutexHandle},
29    socket_pair::{
30        StreamingSocket, StreamingSocketDuplicateError, StreamingSocketPairCreationError,
31        StreamingSocketPairReceiveError, StreamingSocketPairSendError,
32    },
33};
34use iceoryx2_log::{debug, fail, fatal_panic};
35
36use crate::named_concept::{
37    NamedConceptConfiguration, NamedConceptDoesExistError, NamedConceptListError,
38    NamedConceptRemoveError,
39};
40
41use super::{
42    Event, ListenerCreateError, ListenerWaitError, NamedConcept, NamedConceptBuilder,
43    NamedConceptMgmt, NotifierCreateError, NotifierNotifyError, TriggerId,
44};
45
46const MAX_BATCH_SIZE: usize = 512;
47
48#[derive(Debug)]
49struct StorageEntry {
50    notifier: StreamingSocket,
51}
52
53static PROCESS_LOCAL_MTX_HANDLE: LazyLock<MutexHandle<BTreeMap<FilePath, StorageEntry>>> =
54    LazyLock::new(MutexHandle::new);
55
56static PROCESS_LOCAL_STORAGE: LazyLock<Mutex<'static, 'static, BTreeMap<FilePath, StorageEntry>>> =
57    LazyLock::new(|| {
58        fatal_panic!(from "PROCESS_LOCAL_STORAGE",
59            when MutexBuilder::new()
60                .is_interprocess_capable(false)
61                .create(BTreeMap::new(), &PROCESS_LOCAL_MTX_HANDLE),
62            "Failed to create global dynamic storage")
63    });
64
65#[derive(Clone, PartialEq, Eq, Debug)]
66pub struct Configuration {
67    suffix: FileName,
68    prefix: FileName,
69    path: Path,
70}
71
72impl Default for Configuration {
73    fn default() -> Self {
74        Self {
75            path: EventImpl::default_path_hint(),
76            suffix: EventImpl::default_suffix(),
77            prefix: EventImpl::default_prefix(),
78        }
79    }
80}
81
82impl NamedConceptConfiguration for Configuration {
83    fn prefix(mut self, value: &FileName) -> Self {
84        self.prefix = *value;
85        self
86    }
87
88    fn get_prefix(&self) -> &FileName {
89        &self.prefix
90    }
91
92    fn suffix(mut self, value: &FileName) -> Self {
93        self.suffix = *value;
94        self
95    }
96
97    fn path_hint(mut self, value: &Path) -> Self {
98        self.path = *value;
99        self
100    }
101
102    fn get_suffix(&self) -> &FileName {
103        &self.suffix
104    }
105
106    fn get_path_hint(&self) -> &Path {
107        &self.path
108    }
109}
110
111#[derive(Debug)]
112pub struct EventImpl {}
113
114impl NamedConceptMgmt for EventImpl {
115    type Configuration = Configuration;
116
117    fn does_exist_cfg(
118        name: &FileName,
119        cfg: &Self::Configuration,
120    ) -> Result<bool, NamedConceptDoesExistError> {
121        let msg = "Unable to check if event::process_local_socketpair exists";
122        let origin = "event::process_local_socketpair::Event::does_exist_cfg()";
123
124        let guard = fail!(from origin, when PROCESS_LOCAL_STORAGE.lock(),
125                        with NamedConceptDoesExistError::InternalError,
126                        "{} since the lock could not be acquired.", msg);
127
128        match guard.get(&cfg.path_for(name)) {
129            Some(_) => Ok(true),
130            None => Ok(false),
131        }
132    }
133
134    fn list_cfg(cfg: &Self::Configuration) -> Result<Vec<FileName>, NamedConceptListError> {
135        let msg = "Unable to list all event::process_local_socketpairs";
136        let origin = "event::process_local_socketpair::Event::list_cfg()";
137
138        let guard = fail!(from origin, when PROCESS_LOCAL_STORAGE.lock(),
139                                with NamedConceptListError::InternalError,
140                                "{} since the lock could not be acquired.", msg);
141
142        let mut result = vec![];
143        for storage_name in guard.keys() {
144            if let Some(v) = cfg.extract_name_from_path(storage_name) {
145                result.push(v);
146            }
147        }
148
149        Ok(result)
150    }
151
152    unsafe fn remove_cfg(
153        name: &FileName,
154        cfg: &Self::Configuration,
155    ) -> Result<bool, NamedConceptRemoveError> {
156        let storage_name = cfg.path_for(name);
157
158        let msg = "Unable to remove dynamic_storage::process_local";
159        let origin = "dynamic_storage::process_local::Storage::remove_cfg()";
160
161        let mut guard = fail!(from origin, when PROCESS_LOCAL_STORAGE.lock(),
162                                with NamedConceptRemoveError::InternalError,
163                                "{} since the lock could not be acquired.", msg);
164
165        let entry = guard.get_mut(&storage_name);
166        if entry.is_none() {
167            return Ok(false);
168        }
169
170        Ok(guard.remove(&storage_name).is_some())
171    }
172
173    fn remove_path_hint(
174        _value: &Path,
175    ) -> Result<(), crate::named_concept::NamedConceptPathHintRemoveError> {
176        Ok(())
177    }
178}
179
180impl crate::event::Event for EventImpl {
181    type Notifier = Notifier;
182    type Listener = Listener;
183    type NotifierBuilder = NotifierBuilder;
184    type ListenerBuilder = ListenerBuilder;
185}
186
187#[derive(Debug)]
188pub struct Notifier {
189    socket: StreamingSocket,
190    name: FileName,
191}
192
193impl Abandonable for Notifier {
194    unsafe fn abandon_in_place(mut this: NonNull<Self>) {
195        let this = unsafe { this.as_mut() };
196        unsafe { core::ptr::drop_in_place(&mut this.socket) };
197    }
198}
199
200impl NamedConcept for Notifier {
201    fn name(&self) -> &FileName {
202        &self.name
203    }
204}
205
206impl crate::event::Notifier for Notifier {
207    fn notify(&self, id: TriggerId) -> Result<(), NotifierNotifyError> {
208        let msg = "Unable to send notification";
209        let buffer = unsafe {
210            core::slice::from_raw_parts(
211                (&id as *const TriggerId) as *const u8,
212                core::mem::size_of::<TriggerId>(),
213            )
214        };
215        match self.socket.try_send(buffer) {
216            Ok(number_of_bytes) => {
217                if number_of_bytes == 0 {
218                    fail!(from self, with NotifierNotifyError::FailedToDeliverSignal,
219                        "{msg} {id:?} since the listener buffer seems to be full.");
220                } else if number_of_bytes == core::mem::size_of::<TriggerId>() {
221                    Ok(())
222                } else {
223                    fatal_panic!(from self, "This should never happen! {msg} {id:?} could be sent only partially.");
224                }
225            }
226            Err(StreamingSocketPairSendError::Interrupt) => {
227                fail!(from self, with NotifierNotifyError::Interrupt,
228                    "{msg} since an interrupt signal was received.");
229            }
230            Err(StreamingSocketPairSendError::ConnectionReset)
231            | Err(StreamingSocketPairSendError::Disconnected) => {
232                fail!(from self, with NotifierNotifyError::Disconnected,
233                    "{msg} since the corresponding listener disconnected.");
234            }
235            Err(e) => {
236                fail!(from self, with NotifierNotifyError::InternalFailure,
237                    "{msg} due to an unknown failure ({:?}).", e);
238            }
239        }
240    }
241}
242
243#[derive(Debug)]
244pub struct NotifierBuilder {
245    name: FileName,
246    config: Configuration,
247}
248
249impl NamedConceptBuilder<EventImpl> for NotifierBuilder {
250    fn new(name: &FileName) -> Self {
251        Self {
252            name: *name,
253            config: Configuration::default(),
254        }
255    }
256
257    fn config(mut self, config: &Configuration) -> Self {
258        self.config = config.clone();
259        self
260    }
261}
262
263impl crate::event::NotifierBuilder<EventImpl> for NotifierBuilder {
264    fn timeout(self, _timeout: Duration) -> Self {
265        self
266    }
267
268    fn open(self) -> Result<Notifier, NotifierCreateError> {
269        let msg = "Failed to open Notifier";
270        let full_path = self.config.path_for(&self.name);
271
272        let guard = fail!(from self, when PROCESS_LOCAL_STORAGE.lock(),
273            with NotifierCreateError::InternalFailure,
274            "{msg} due to a failure while acquiring the lock.");
275
276        match guard.get(&full_path) {
277            Some(entry) => match entry.notifier.duplicate() {
278                Ok(socket) => Ok(Notifier {
279                    name: self.name,
280                    socket,
281                }),
282                Err(StreamingSocketDuplicateError::Interrupt) => {
283                    fail!(from self, with NotifierCreateError::Interrupt,
284                        "{msg} since an interrupt signal was received.");
285                }
286                Err(e) => {
287                    fail!(from self, with NotifierCreateError::InternalFailure,
288                        "{msg} due to an unknown failure ({:?}).", e);
289                }
290            },
291            None => {
292                fail!(from self, with NotifierCreateError::DoesNotExist,
293                    "{msg} since the event does not exist.");
294            }
295        }
296    }
297}
298
299#[derive(Debug)]
300pub struct Listener {
301    name: FileName,
302    socket: StreamingSocket,
303    config: Configuration,
304}
305
306impl Abandonable for Listener {
307    unsafe fn abandon_in_place(mut this: NonNull<Self>) {
308        let this = unsafe { this.as_mut() };
309        unsafe { core::ptr::drop_in_place(&mut this.socket) };
310    }
311}
312
313impl Drop for Listener {
314    fn drop(&mut self) {
315        if let Err(e) = unsafe { EventImpl::remove_cfg(&self.name, &self.config) } {
316            debug!(from self, "Unable to cleanup event after the Listener was dropped ({:?}).", e);
317        }
318    }
319}
320
321impl FileDescriptorBased for Listener {
322    fn file_descriptor(&self) -> &iceoryx2_bb_posix::file_descriptor::FileDescriptor {
323        self.socket.file_descriptor()
324    }
325}
326
327impl SynchronousMultiplexing for Listener {}
328
329impl NamedConcept for Listener {
330    fn name(&self) -> &FileName {
331        &self.name
332    }
333}
334
335impl Listener {
336    fn wait_one_impl<
337        WaitCall: FnMut(&mut [u8]) -> Result<usize, StreamingSocketPairReceiveError>,
338    >(
339        &self,
340        mut waitcall: WaitCall,
341        msg: &str,
342    ) -> Result<Option<TriggerId>, ListenerWaitError> {
343        let trigger_id_size = core::mem::size_of::<TriggerId>();
344        let mut trigger_id = TriggerId::new(0);
345        let raw_trigger_id = unsafe {
346            core::slice::from_raw_parts_mut(
347                ((&mut trigger_id) as *mut TriggerId) as *mut u8,
348                trigger_id_size,
349            )
350        };
351
352        match waitcall(raw_trigger_id) {
353            Ok(number_of_bytes) => {
354                if number_of_bytes == 0 {
355                    Ok(None)
356                } else if number_of_bytes == trigger_id_size {
357                    Ok(Some(trigger_id))
358                } else {
359                    fail!(from self, with ListenerWaitError::ContractViolation,
360                    "{msg} due to a contract violation. Expected to receive {} bytes but got {} bytes.",
361                    trigger_id_size, number_of_bytes);
362                }
363            }
364            Err(StreamingSocketPairReceiveError::Interrupt) => {
365                fail!(from self, with ListenerWaitError::InterruptSignal,
366                    "{msg} since an interrupt signal was received.");
367            }
368            Err(e) => {
369                fail!(from self, with ListenerWaitError::InternalFailure,
370                    "{msg} due to an internal failure while receiving data on the underlying streaming socket ({:?}).", e);
371            }
372        }
373    }
374
375    fn wait_all_impl<
376        WaitCall: FnMut(&mut [u8]) -> Result<usize, StreamingSocketPairReceiveError>,
377        F: FnMut(TriggerId),
378    >(
379        &self,
380        mut callback: F,
381        waitcall: WaitCall,
382        msg: &str,
383    ) -> Result<(), ListenerWaitError> {
384        match self.wait_one_impl(waitcall, msg)? {
385            None => return Ok(()),
386            Some(trigger_id) => callback(trigger_id),
387        }
388
389        for _ in 0..MAX_BATCH_SIZE {
390            match self.wait_one_impl(|buffer| self.socket.try_receive(buffer), msg)? {
391                None => return Ok(()),
392                Some(trigger_id) => callback(trigger_id),
393            }
394        }
395
396        Ok(())
397    }
398}
399
400impl crate::event::Listener for Listener {
401    const IS_FILE_DESCRIPTOR_BASED: bool = true;
402
403    fn try_wait_one(&self) -> Result<Option<TriggerId>, ListenerWaitError> {
404        self.wait_one_impl(
405            |buffer| self.socket.try_receive(buffer),
406            "Unable to try to receive a TriggerId",
407        )
408    }
409
410    fn timed_wait_one(
411        &self,
412        timeout: core::time::Duration,
413    ) -> Result<Option<TriggerId>, ListenerWaitError> {
414        self.wait_one_impl(
415            |buffer| self.socket.timed_receive(buffer, timeout),
416            "Unable to receive a TriggerId with a timeout",
417        )
418    }
419
420    fn blocking_wait_one(&self) -> Result<Option<TriggerId>, ListenerWaitError> {
421        self.wait_one_impl(
422            |buffer| self.socket.blocking_receive(buffer),
423            "Unable to block until a TriggerId is received",
424        )
425    }
426
427    fn try_wait_all<F: FnMut(TriggerId)>(&self, callback: F) -> Result<(), ListenerWaitError> {
428        self.wait_all_impl(
429            callback,
430            |buffer| self.socket.try_receive(buffer),
431            "Unable to try to receive all TriggerIds",
432        )
433    }
434
435    fn timed_wait_all<F: FnMut(TriggerId)>(
436        &self,
437        callback: F,
438        timeout: Duration,
439    ) -> Result<(), ListenerWaitError> {
440        self.wait_all_impl(
441            callback,
442            |buffer| self.socket.timed_receive(buffer, timeout),
443            "Unable to receive all TriggerIds with a timeout",
444        )
445    }
446
447    fn blocking_wait_all<F: FnMut(TriggerId)>(&self, callback: F) -> Result<(), ListenerWaitError> {
448        self.wait_all_impl(
449            callback,
450            |buffer| self.socket.blocking_receive(buffer),
451            "Unable to block until all TriggerIds are received",
452        )
453    }
454}
455
456#[derive(Debug)]
457pub struct ListenerBuilder {
458    name: FileName,
459    config: Configuration,
460}
461
462impl NamedConceptBuilder<EventImpl> for ListenerBuilder {
463    fn new(name: &FileName) -> Self {
464        Self {
465            name: *name,
466            config: Configuration::default(),
467        }
468    }
469
470    fn config(mut self, config: &<EventImpl as super::NamedConceptMgmt>::Configuration) -> Self {
471        self.config = config.clone();
472        self
473    }
474}
475
476impl crate::event::ListenerBuilder<EventImpl> for ListenerBuilder {
477    fn trigger_id_max(self, _id: TriggerId) -> Self {
478        self
479    }
480
481    fn create(self) -> Result<Listener, ListenerCreateError> {
482        let msg = "Failed to create Listener";
483        let full_path = self.config.path_for(&self.name);
484
485        let mut guard = fail!(from self, when PROCESS_LOCAL_STORAGE.lock(),
486            with ListenerCreateError::InternalFailure,
487            "{msg} due to a failure while acquiring the lock.");
488        let entry = guard.get_mut(&full_path);
489        if entry.is_some() {
490            fail!(from self, with ListenerCreateError::AlreadyExists,
491                "{msg} since the event already exists.");
492        }
493
494        let (notifier, listener) = match StreamingSocket::create_pair() {
495            Ok((notifier, listener)) => (notifier, listener),
496            Err(StreamingSocketPairCreationError::InsufficientPermissions) => {
497                fail!(from self, with ListenerCreateError::InsufficientPermissions,
498                    "{msg} due to insufficient permissions to create a socket pair.");
499            }
500            Err(e) => {
501                fail!(from self, with ListenerCreateError::InternalFailure,
502                    "{msg} due to an internal error while creating the socket pair ({:?}).", e);
503            }
504        };
505
506        guard.insert(full_path, StorageEntry { notifier });
507
508        Ok(Listener {
509            name: self.name,
510            socket: listener,
511            config: self.config,
512        })
513    }
514}