1use core::ptr::NonNull;
14use core::time::Duration;
15
16use alloc::collections::BTreeMap;
17use alloc::vec;
18use alloc::vec::Vec;
19
20pub use iceoryx2_bb_container::semantic_string::SemanticString;
21pub use iceoryx2_bb_system_types::{file_name::FileName, file_path::FilePath, path::Path};
22
23use iceoryx2_bb_concurrency::lazy_lock::LazyLock;
24use iceoryx2_bb_elementary_traits::testing::abandonable::Abandonable;
25use iceoryx2_bb_posix::{
26 file_descriptor::FileDescriptorBased,
27 file_descriptor_set::SynchronousMultiplexing,
28 mutex::{Handle, Mutex, MutexBuilder, MutexHandle},
29 socket_pair::{
30 StreamingSocket, StreamingSocketDuplicateError, StreamingSocketPairCreationError,
31 StreamingSocketPairReceiveError, StreamingSocketPairSendError,
32 },
33};
34use iceoryx2_log::{debug, fail, fatal_panic};
35
36use crate::named_concept::{
37 NamedConceptConfiguration, NamedConceptDoesExistError, NamedConceptListError,
38 NamedConceptRemoveError,
39};
40
41use super::{
42 Event, ListenerCreateError, ListenerWaitError, NamedConcept, NamedConceptBuilder,
43 NamedConceptMgmt, NotifierCreateError, NotifierNotifyError, TriggerId,
44};
45
46const MAX_BATCH_SIZE: usize = 512;
47
48#[derive(Debug)]
49struct StorageEntry {
50 notifier: StreamingSocket,
51}
52
53static PROCESS_LOCAL_MTX_HANDLE: LazyLock<MutexHandle<BTreeMap<FilePath, StorageEntry>>> =
54 LazyLock::new(MutexHandle::new);
55
56static PROCESS_LOCAL_STORAGE: LazyLock<Mutex<'static, 'static, BTreeMap<FilePath, StorageEntry>>> =
57 LazyLock::new(|| {
58 fatal_panic!(from "PROCESS_LOCAL_STORAGE",
59 when MutexBuilder::new()
60 .is_interprocess_capable(false)
61 .create(BTreeMap::new(), &PROCESS_LOCAL_MTX_HANDLE),
62 "Failed to create global dynamic storage")
63 });
64
65#[derive(Clone, PartialEq, Eq, Debug)]
66pub struct Configuration {
67 suffix: FileName,
68 prefix: FileName,
69 path: Path,
70}
71
72impl Default for Configuration {
73 fn default() -> Self {
74 Self {
75 path: EventImpl::default_path_hint(),
76 suffix: EventImpl::default_suffix(),
77 prefix: EventImpl::default_prefix(),
78 }
79 }
80}
81
82impl NamedConceptConfiguration for Configuration {
83 fn prefix(mut self, value: &FileName) -> Self {
84 self.prefix = *value;
85 self
86 }
87
88 fn get_prefix(&self) -> &FileName {
89 &self.prefix
90 }
91
92 fn suffix(mut self, value: &FileName) -> Self {
93 self.suffix = *value;
94 self
95 }
96
97 fn path_hint(mut self, value: &Path) -> Self {
98 self.path = *value;
99 self
100 }
101
102 fn get_suffix(&self) -> &FileName {
103 &self.suffix
104 }
105
106 fn get_path_hint(&self) -> &Path {
107 &self.path
108 }
109}
110
111#[derive(Debug)]
112pub struct EventImpl {}
113
114impl NamedConceptMgmt for EventImpl {
115 type Configuration = Configuration;
116
117 fn does_exist_cfg(
118 name: &FileName,
119 cfg: &Self::Configuration,
120 ) -> Result<bool, NamedConceptDoesExistError> {
121 let msg = "Unable to check if event::process_local_socketpair exists";
122 let origin = "event::process_local_socketpair::Event::does_exist_cfg()";
123
124 let guard = fail!(from origin, when PROCESS_LOCAL_STORAGE.lock(),
125 with NamedConceptDoesExistError::InternalError,
126 "{} since the lock could not be acquired.", msg);
127
128 match guard.get(&cfg.path_for(name)) {
129 Some(_) => Ok(true),
130 None => Ok(false),
131 }
132 }
133
134 fn list_cfg(cfg: &Self::Configuration) -> Result<Vec<FileName>, NamedConceptListError> {
135 let msg = "Unable to list all event::process_local_socketpairs";
136 let origin = "event::process_local_socketpair::Event::list_cfg()";
137
138 let guard = fail!(from origin, when PROCESS_LOCAL_STORAGE.lock(),
139 with NamedConceptListError::InternalError,
140 "{} since the lock could not be acquired.", msg);
141
142 let mut result = vec![];
143 for storage_name in guard.keys() {
144 if let Some(v) = cfg.extract_name_from_path(storage_name) {
145 result.push(v);
146 }
147 }
148
149 Ok(result)
150 }
151
152 unsafe fn remove_cfg(
153 name: &FileName,
154 cfg: &Self::Configuration,
155 ) -> Result<bool, NamedConceptRemoveError> {
156 let storage_name = cfg.path_for(name);
157
158 let msg = "Unable to remove dynamic_storage::process_local";
159 let origin = "dynamic_storage::process_local::Storage::remove_cfg()";
160
161 let mut guard = fail!(from origin, when PROCESS_LOCAL_STORAGE.lock(),
162 with NamedConceptRemoveError::InternalError,
163 "{} since the lock could not be acquired.", msg);
164
165 let entry = guard.get_mut(&storage_name);
166 if entry.is_none() {
167 return Ok(false);
168 }
169
170 Ok(guard.remove(&storage_name).is_some())
171 }
172
173 fn remove_path_hint(
174 _value: &Path,
175 ) -> Result<(), crate::named_concept::NamedConceptPathHintRemoveError> {
176 Ok(())
177 }
178}
179
180impl crate::event::Event for EventImpl {
181 type Notifier = Notifier;
182 type Listener = Listener;
183 type NotifierBuilder = NotifierBuilder;
184 type ListenerBuilder = ListenerBuilder;
185}
186
187#[derive(Debug)]
188pub struct Notifier {
189 socket: StreamingSocket,
190 name: FileName,
191}
192
193impl Abandonable for Notifier {
194 unsafe fn abandon_in_place(mut this: NonNull<Self>) {
195 let this = unsafe { this.as_mut() };
196 unsafe { core::ptr::drop_in_place(&mut this.socket) };
197 }
198}
199
200impl NamedConcept for Notifier {
201 fn name(&self) -> &FileName {
202 &self.name
203 }
204}
205
206impl crate::event::Notifier for Notifier {
207 fn notify(&self, id: TriggerId) -> Result<(), NotifierNotifyError> {
208 let msg = "Unable to send notification";
209 let buffer = unsafe {
210 core::slice::from_raw_parts(
211 (&id as *const TriggerId) as *const u8,
212 core::mem::size_of::<TriggerId>(),
213 )
214 };
215 match self.socket.try_send(buffer) {
216 Ok(number_of_bytes) => {
217 if number_of_bytes == 0 {
218 fail!(from self, with NotifierNotifyError::FailedToDeliverSignal,
219 "{msg} {id:?} since the listener buffer seems to be full.");
220 } else if number_of_bytes == core::mem::size_of::<TriggerId>() {
221 Ok(())
222 } else {
223 fatal_panic!(from self, "This should never happen! {msg} {id:?} could be sent only partially.");
224 }
225 }
226 Err(StreamingSocketPairSendError::Interrupt) => {
227 fail!(from self, with NotifierNotifyError::Interrupt,
228 "{msg} since an interrupt signal was received.");
229 }
230 Err(StreamingSocketPairSendError::ConnectionReset)
231 | Err(StreamingSocketPairSendError::Disconnected) => {
232 fail!(from self, with NotifierNotifyError::Disconnected,
233 "{msg} since the corresponding listener disconnected.");
234 }
235 Err(e) => {
236 fail!(from self, with NotifierNotifyError::InternalFailure,
237 "{msg} due to an unknown failure ({:?}).", e);
238 }
239 }
240 }
241}
242
243#[derive(Debug)]
244pub struct NotifierBuilder {
245 name: FileName,
246 config: Configuration,
247}
248
249impl NamedConceptBuilder<EventImpl> for NotifierBuilder {
250 fn new(name: &FileName) -> Self {
251 Self {
252 name: *name,
253 config: Configuration::default(),
254 }
255 }
256
257 fn config(mut self, config: &Configuration) -> Self {
258 self.config = config.clone();
259 self
260 }
261}
262
263impl crate::event::NotifierBuilder<EventImpl> for NotifierBuilder {
264 fn timeout(self, _timeout: Duration) -> Self {
265 self
266 }
267
268 fn open(self) -> Result<Notifier, NotifierCreateError> {
269 let msg = "Failed to open Notifier";
270 let full_path = self.config.path_for(&self.name);
271
272 let guard = fail!(from self, when PROCESS_LOCAL_STORAGE.lock(),
273 with NotifierCreateError::InternalFailure,
274 "{msg} due to a failure while acquiring the lock.");
275
276 match guard.get(&full_path) {
277 Some(entry) => match entry.notifier.duplicate() {
278 Ok(socket) => Ok(Notifier {
279 name: self.name,
280 socket,
281 }),
282 Err(StreamingSocketDuplicateError::Interrupt) => {
283 fail!(from self, with NotifierCreateError::Interrupt,
284 "{msg} since an interrupt signal was received.");
285 }
286 Err(e) => {
287 fail!(from self, with NotifierCreateError::InternalFailure,
288 "{msg} due to an unknown failure ({:?}).", e);
289 }
290 },
291 None => {
292 fail!(from self, with NotifierCreateError::DoesNotExist,
293 "{msg} since the event does not exist.");
294 }
295 }
296 }
297}
298
299#[derive(Debug)]
300pub struct Listener {
301 name: FileName,
302 socket: StreamingSocket,
303 config: Configuration,
304}
305
306impl Abandonable for Listener {
307 unsafe fn abandon_in_place(mut this: NonNull<Self>) {
308 let this = unsafe { this.as_mut() };
309 unsafe { core::ptr::drop_in_place(&mut this.socket) };
310 }
311}
312
313impl Drop for Listener {
314 fn drop(&mut self) {
315 if let Err(e) = unsafe { EventImpl::remove_cfg(&self.name, &self.config) } {
316 debug!(from self, "Unable to cleanup event after the Listener was dropped ({:?}).", e);
317 }
318 }
319}
320
321impl FileDescriptorBased for Listener {
322 fn file_descriptor(&self) -> &iceoryx2_bb_posix::file_descriptor::FileDescriptor {
323 self.socket.file_descriptor()
324 }
325}
326
327impl SynchronousMultiplexing for Listener {}
328
329impl NamedConcept for Listener {
330 fn name(&self) -> &FileName {
331 &self.name
332 }
333}
334
335impl Listener {
336 fn wait_one_impl<
337 WaitCall: FnMut(&mut [u8]) -> Result<usize, StreamingSocketPairReceiveError>,
338 >(
339 &self,
340 mut waitcall: WaitCall,
341 msg: &str,
342 ) -> Result<Option<TriggerId>, ListenerWaitError> {
343 let trigger_id_size = core::mem::size_of::<TriggerId>();
344 let mut trigger_id = TriggerId::new(0);
345 let raw_trigger_id = unsafe {
346 core::slice::from_raw_parts_mut(
347 ((&mut trigger_id) as *mut TriggerId) as *mut u8,
348 trigger_id_size,
349 )
350 };
351
352 match waitcall(raw_trigger_id) {
353 Ok(number_of_bytes) => {
354 if number_of_bytes == 0 {
355 Ok(None)
356 } else if number_of_bytes == trigger_id_size {
357 Ok(Some(trigger_id))
358 } else {
359 fail!(from self, with ListenerWaitError::ContractViolation,
360 "{msg} due to a contract violation. Expected to receive {} bytes but got {} bytes.",
361 trigger_id_size, number_of_bytes);
362 }
363 }
364 Err(StreamingSocketPairReceiveError::Interrupt) => {
365 fail!(from self, with ListenerWaitError::InterruptSignal,
366 "{msg} since an interrupt signal was received.");
367 }
368 Err(e) => {
369 fail!(from self, with ListenerWaitError::InternalFailure,
370 "{msg} due to an internal failure while receiving data on the underlying streaming socket ({:?}).", e);
371 }
372 }
373 }
374
375 fn wait_all_impl<
376 WaitCall: FnMut(&mut [u8]) -> Result<usize, StreamingSocketPairReceiveError>,
377 F: FnMut(TriggerId),
378 >(
379 &self,
380 mut callback: F,
381 waitcall: WaitCall,
382 msg: &str,
383 ) -> Result<(), ListenerWaitError> {
384 match self.wait_one_impl(waitcall, msg)? {
385 None => return Ok(()),
386 Some(trigger_id) => callback(trigger_id),
387 }
388
389 for _ in 0..MAX_BATCH_SIZE {
390 match self.wait_one_impl(|buffer| self.socket.try_receive(buffer), msg)? {
391 None => return Ok(()),
392 Some(trigger_id) => callback(trigger_id),
393 }
394 }
395
396 Ok(())
397 }
398}
399
400impl crate::event::Listener for Listener {
401 const IS_FILE_DESCRIPTOR_BASED: bool = true;
402
403 fn try_wait_one(&self) -> Result<Option<TriggerId>, ListenerWaitError> {
404 self.wait_one_impl(
405 |buffer| self.socket.try_receive(buffer),
406 "Unable to try to receive a TriggerId",
407 )
408 }
409
410 fn timed_wait_one(
411 &self,
412 timeout: core::time::Duration,
413 ) -> Result<Option<TriggerId>, ListenerWaitError> {
414 self.wait_one_impl(
415 |buffer| self.socket.timed_receive(buffer, timeout),
416 "Unable to receive a TriggerId with a timeout",
417 )
418 }
419
420 fn blocking_wait_one(&self) -> Result<Option<TriggerId>, ListenerWaitError> {
421 self.wait_one_impl(
422 |buffer| self.socket.blocking_receive(buffer),
423 "Unable to block until a TriggerId is received",
424 )
425 }
426
427 fn try_wait_all<F: FnMut(TriggerId)>(&self, callback: F) -> Result<(), ListenerWaitError> {
428 self.wait_all_impl(
429 callback,
430 |buffer| self.socket.try_receive(buffer),
431 "Unable to try to receive all TriggerIds",
432 )
433 }
434
435 fn timed_wait_all<F: FnMut(TriggerId)>(
436 &self,
437 callback: F,
438 timeout: Duration,
439 ) -> Result<(), ListenerWaitError> {
440 self.wait_all_impl(
441 callback,
442 |buffer| self.socket.timed_receive(buffer, timeout),
443 "Unable to receive all TriggerIds with a timeout",
444 )
445 }
446
447 fn blocking_wait_all<F: FnMut(TriggerId)>(&self, callback: F) -> Result<(), ListenerWaitError> {
448 self.wait_all_impl(
449 callback,
450 |buffer| self.socket.blocking_receive(buffer),
451 "Unable to block until all TriggerIds are received",
452 )
453 }
454}
455
456#[derive(Debug)]
457pub struct ListenerBuilder {
458 name: FileName,
459 config: Configuration,
460}
461
462impl NamedConceptBuilder<EventImpl> for ListenerBuilder {
463 fn new(name: &FileName) -> Self {
464 Self {
465 name: *name,
466 config: Configuration::default(),
467 }
468 }
469
470 fn config(mut self, config: &<EventImpl as super::NamedConceptMgmt>::Configuration) -> Self {
471 self.config = config.clone();
472 self
473 }
474}
475
476impl crate::event::ListenerBuilder<EventImpl> for ListenerBuilder {
477 fn trigger_id_max(self, _id: TriggerId) -> Self {
478 self
479 }
480
481 fn create(self) -> Result<Listener, ListenerCreateError> {
482 let msg = "Failed to create Listener";
483 let full_path = self.config.path_for(&self.name);
484
485 let mut guard = fail!(from self, when PROCESS_LOCAL_STORAGE.lock(),
486 with ListenerCreateError::InternalFailure,
487 "{msg} due to a failure while acquiring the lock.");
488 let entry = guard.get_mut(&full_path);
489 if entry.is_some() {
490 fail!(from self, with ListenerCreateError::AlreadyExists,
491 "{msg} since the event already exists.");
492 }
493
494 let (notifier, listener) = match StreamingSocket::create_pair() {
495 Ok((notifier, listener)) => (notifier, listener),
496 Err(StreamingSocketPairCreationError::InsufficientPermissions) => {
497 fail!(from self, with ListenerCreateError::InsufficientPermissions,
498 "{msg} due to insufficient permissions to create a socket pair.");
499 }
500 Err(e) => {
501 fail!(from self, with ListenerCreateError::InternalFailure,
502 "{msg} due to an internal error while creating the socket pair ({:?}).", e);
503 }
504 };
505
506 guard.insert(full_path, StorageEntry { notifier });
507
508 Ok(Listener {
509 name: self.name,
510 socket: listener,
511 config: self.config,
512 })
513 }
514}