1use core::{fmt::Debug, marker::PhantomData};
92
93use alloc::vec;
94use alloc::vec::Vec;
95
96use iceoryx2_bb_concurrency::cell::UnsafeCell;
97use iceoryx2_bb_container::string::*;
98use iceoryx2_bb_elementary::{enum_gen, scope_guard::ScopeGuardBuilder};
99use iceoryx2_log::{fail, fatal_panic, warn};
100use iceoryx2_pal_posix::posix::CPU_SETSIZE;
101use iceoryx2_pal_posix::posix::{errno::Errno, MemZeroedStruct};
102use iceoryx2_pal_posix::*;
103
104use crate::{
105 config::MAX_THREAD_NAME_LENGTH,
106 scheduler::Scheduler,
107 signal::Signal,
108 system_configuration::{Limit, SystemInfo},
109};
110
111pub type ThreadName = StaticString<{ MAX_THREAD_NAME_LENGTH - 1 }>;
113
114enum_gen! { ThreadSpawnError
115 entry:
116 InsufficientMemory,
117 InsufficientResources,
118 InvalidSettings,
119 InsufficientPermissions,
120 InvalidGuardSize,
121 ContentionScopeNotSupported,
122 SchedulerPolicyNotSupported,
123 StackSizeTooSmall,
124 ProvidedStackSizeMemoryTooSmall,
125 ProvidedStackMemoryIsNotReadAndWritable,
126 SchedulerPriorityInheritanceNotSupported,
127 ThreadPrioritiesNotSupported,
128 CpuCoreOutsideOfSupportedCpuRangeForAffinity,
129 UnknownError(i32)
130 mapping:
131 ThreadSetNameError
132}
133
134impl core::fmt::Display for ThreadSpawnError {
135 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
136 write!(f, "ThreadSpawnError::{self:?}")
137 }
138}
139
140impl core::error::Error for ThreadSpawnError {}
141
142#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
143pub enum ThreadSignalError {
144 ThreadNoLongerActive,
145 UnknownError(i32),
146}
147
148impl core::fmt::Display for ThreadSignalError {
149 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
150 write!(f, "ThreadSignalError::{self:?}")
151 }
152}
153
154impl core::error::Error for ThreadSignalError {}
155
156enum_gen! { ThreadSetNameError
157 entry:
158 UnknownError(i32)
159}
160
161impl core::fmt::Display for ThreadSetNameError {
162 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
163 write!(f, "ThreadSetNameError::{self:?}")
164 }
165}
166
167impl core::error::Error for ThreadSetNameError {}
168
169#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
170pub enum ThreadSetAffinityError {
171 InvalidCpuCores,
172 UnknownError(i32),
173}
174
175impl core::fmt::Display for ThreadSetAffinityError {
176 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
177 write!(f, "ThreadSetAffinityError::{self:?}")
178 }
179}
180
181impl core::error::Error for ThreadSetAffinityError {}
182
183enum_gen! {
184 ThreadGetNameError
185 entry:
186 ThreadNameLongerThanMaxSupportedSize,
187 UnknownError(i32)
188}
189
190impl core::fmt::Display for ThreadGetNameError {
191 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
192 write!(f, "ThreadGetNameError::{self:?}")
193 }
194}
195
196impl core::error::Error for ThreadGetNameError {}
197
198enum_gen! {
199 ThreadError
204 generalization:
205 FailedToSpawn <= ThreadSpawnError,
206 NameSetupFailed <= ThreadSetNameError; ThreadGetNameError,
207 FailedToSignal <= ThreadSignalError,
208 FailedToSetAffinity <= ThreadSetAffinityError
209}
210
211#[derive(Debug)]
213pub struct ThreadBuilder {
214 guard_size: u64,
215 inherit_scheduling_attributes: bool,
216 scheduler: Scheduler,
217 priority: u8,
218 stack_size: Option<u64>,
219 affinity: [bool; posix::CPU_SETSIZE],
220 has_custom_affinity: bool,
221 has_invalid_affinity: bool,
222 name: ThreadName,
223}
224
225impl Default for ThreadBuilder {
226 fn default() -> Self {
227 Self {
228 guard_size: 0,
229 inherit_scheduling_attributes: true,
230 priority: 0,
231 scheduler: Scheduler::default(),
232 affinity: [true; posix::CPU_SETSIZE],
233 has_custom_affinity: false,
234 has_invalid_affinity: false,
235 stack_size: None,
236 name: ThreadName::new(),
237 }
238 }
239}
240
241impl ThreadBuilder {
242 pub fn new() -> Self {
243 Self::default()
244 }
245
246 pub fn name(mut self, value: &ThreadName) -> Self {
249 self.name = *value;
250 self
251 }
252
253 pub fn inherit_scheduling_attributes(mut self, value: bool) -> Self {
255 self.inherit_scheduling_attributes = value;
256 self
257 }
258
259 pub fn affinity(mut self, cpu_core_ids: &[usize]) -> Self {
272 self.affinity = [false; posix::CPU_SETSIZE];
273
274 for cpu_core_id in cpu_core_ids {
275 if *cpu_core_id >= posix::CPU_SETSIZE {
276 self.has_invalid_affinity = true;
277 return self;
278 }
279
280 self.affinity[*cpu_core_id] = true;
281 }
282
283 self.has_custom_affinity = true;
284 self
285 }
286
287 pub fn priority(mut self, value: u8) -> Self {
293 self.priority = value;
294 self
295 }
296
297 pub fn scheduler(mut self, value: Scheduler) -> Self {
299 self.scheduler = value;
300 self
301 }
302
303 pub fn guard_size(mut self, value: u64) -> ThreadGuardedStackBuilder {
309 self.guard_size = value;
310 ThreadGuardedStackBuilder { config: self }
311 }
312
313 pub fn stack_size(mut self, value: u64) -> ThreadGuardedStackBuilder {
326 self.stack_size = Some(value);
327 ThreadGuardedStackBuilder { config: self }
328 }
329
330 pub fn spawn<'thread, T, F>(self, f: F) -> Result<Thread, ThreadSpawnError>
331 where
332 T: Debug + Send + 'thread,
333 F: FnOnce() -> T + Send + 'thread,
334 {
335 self.spawn_impl(f)
336 }
337
338 fn spawn_impl<'thread, T, F>(self, f: F) -> Result<Thread, ThreadSpawnError>
340 where
341 T: Debug + Send + 'thread,
342 F: FnOnce() -> T + Send + 'thread,
343 {
344 if self.has_invalid_affinity {
345 fail!(from self, with ThreadSpawnError::CpuCoreOutsideOfSupportedCpuRangeForAffinity,
346 "Unable to set the threads cpu affinity since the provided core value exceeds the cpu affinity set size of {}.",
347 CPU_SETSIZE);
348 }
349
350 if self.has_custom_affinity {
351 let number_of_cores = SystemInfo::NumberOfCpuCores.value();
352 for (cpu_core_id, has_affinity) in self.affinity[number_of_cores..].iter().enumerate() {
353 if *has_affinity {
354 fail!(from self,
355 with ThreadSpawnError::CpuCoreOutsideOfSupportedCpuRangeForAffinity,
356 "Unable to set the threads affinity since the system has cores from [0, {}] and the cpu core {} was set.",
357 number_of_cores - 1, cpu_core_id);
358 }
359 }
360 }
361
362 let mut attributes = ScopeGuardBuilder::new( posix::pthread_attr_t::new_zeroed())
363 .on_init(|attr| {
364 let msg = "Failed to initialize thread attributes";
365 handle_errno!(ThreadSpawnError, from self,
366 errno_source unsafe {posix::pthread_attr_init(attr).into()},
367 success Errno::ESUCCES => (),
368 Errno::ENOMEM => (InsufficientMemory, "{} due to insufficient memory.", msg),
369 v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
370 );
371 }).on_drop(|attr| match unsafe { posix::pthread_attr_destroy(attr)} {
372 0 => (),
373 v => {
374 fatal_panic!(from self, "This should never happen! Failed to cleanup thread attributes ({}).",v);
375 }
376 }).create()?;
377
378 let msg = "Failed to set guard size";
379 handle_errno!(ThreadSpawnError, from self,
380 errno_source unsafe { posix::pthread_attr_setguardsize(attributes.get_mut(), self.guard_size as usize).into()},
381 continue_on_success,
382 success Errno::ESUCCES => (),
383 Errno::EINVAL => (InvalidGuardSize, "{} since the guard size value is invalid.", msg),
384 v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
385 );
386
387 let msg = "Failed to set scheduler priority inheritance settings";
388 handle_errno!(ThreadSpawnError, from self,
389 errno_source unsafe { posix::pthread_attr_setinheritsched(attributes.get_mut(), if self.inherit_scheduling_attributes {
390 posix::PTHREAD_INHERIT_SCHED
391 } else {
392 posix::PTHREAD_EXPLICIT_SCHED
393 }).into()},
394 continue_on_success,
395 success Errno::ESUCCES => (),
396 Errno::ENOSYS => (SchedulerPriorityInheritanceNotSupported, "{} since it is not supported by the system.", msg),
397 Errno::ENOTSUP => (SchedulerPriorityInheritanceNotSupported, "{} since it is not supported by the system.", msg),
398 v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg,v )
399 );
400
401 let msg = "Failed to set scheduler policy";
402 handle_errno!(ThreadSpawnError, from self,
403 errno_source unsafe { posix::pthread_attr_setschedpolicy(attributes.get_mut(), self.scheduler as i32).into()},
404 continue_on_success,
405 success Errno::ESUCCES => (),
406 Errno::ENOSYS => (SchedulerPolicyNotSupported, "{} since it is not supported by the system.", msg),
407 Errno::ENOTSUP => (SchedulerPolicyNotSupported, "{} since it is not supported by the system.", msg),
408 v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
409 );
410
411 let mut param = posix::sched_param::new_zeroed();
412 param.sched_priority = self.scheduler.policy_specific_priority(self.priority);
413 let msg = "Failed to set thread priority";
414 handle_errno!(ThreadSpawnError, from self,
415 errno_source unsafe { posix::pthread_attr_setschedparam(attributes.get_mut(), ¶m).into() },
416 continue_on_success,
417 success Errno::ESUCCES => (),
418 Errno::ENOTSUP => (ThreadPrioritiesNotSupported, "{} since it is not supported by the system.", msg),
419 v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg,v)
420 );
421
422 if self.stack_size.is_some() {
423 let msg = "Failed to set the threads stack size";
424 let stack_size = *self.stack_size.as_ref().unwrap();
425 let min_stack_size = Limit::MinStackSizeOfThread.value();
426
427 if stack_size < min_stack_size {
428 fail!(from self, with ThreadSpawnError::StackSizeTooSmall,
429 "{} since the stack size is smaller than the minimum required stack size of {}.", msg, min_stack_size);
430 }
431
432 handle_errno!(ThreadSpawnError, from self,
433 errno_source unsafe { posix::pthread_attr_setstacksize(attributes.get_mut(), *self.stack_size.as_ref().unwrap() as usize)
434 .into() },
435 continue_on_success,
436 success Errno::ESUCCES => (),
437 v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg,v)
438 );
439 }
440
441 let mut cpuset = posix::cpu_set_t::new_zeroed();
442 for i in 0..core::cmp::min(posix::CPU_SETSIZE, SystemInfo::NumberOfCpuCores.value()) {
443 if self.affinity[i] {
444 cpuset.set(i);
445 }
446 }
447
448 if posix::support::POSIX_SUPPORT_CPU_AFFINITY {
449 let msg = "Unable to set cpu affinity for thread";
450 handle_errno!(ThreadSpawnError, from self,
451 errno_source unsafe { posix::pthread_attr_setaffinity_np(attributes.get_mut(), core::mem::size_of::<posix::cpu_set_t>(), &cpuset)
452 .into()},
453 continue_on_success,
454 success Errno::ESUCCES => (),
455 Errno::EINVAL => (CpuCoreOutsideOfSupportedCpuRangeForAffinity, "{} since it contains cores greater than the maximum supported number of CPU cores of the system.", msg),
456 Errno::ENOMEM => (InsufficientMemory, "{} due to insufficient memory.", msg),
457 v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg,v)
458 );
459 }
460
461 extern "C" fn start_routine<'thread, FF, TT>(args: *mut posix::void) -> *mut posix::void
462 where
463 TT: Send + Debug + 'thread,
464 FF: FnOnce() -> TT + Send + 'thread,
465 {
466 let t: ThreadStartupArgs<TT, FF> =
467 unsafe { core::ptr::read(args as *const ThreadStartupArgs<TT, FF>) };
468
469 if !t.name.is_empty() {
470 let handle = unsafe { posix::pthread_self() };
471 if unsafe { posix::pthread_setname_np(handle, t.name.as_c_str()) }
472 != Errno::ESUCCES as i32
473 {
474 warn!(from "Thread::spawn", "Unable to set the name of the newly spawned thread to \"{}\".", t.name);
475 }
476 }
477
478 (t.callback)();
479 unsafe { posix::free(args) };
480 core::ptr::null_mut::<posix::void>()
481 }
482
483 let startup_args = unsafe {
484 posix::malloc(core::mem::size_of::<ThreadStartupArgs<T, F>>())
485 as *mut ThreadStartupArgs<T, F>
486 };
487
488 unsafe {
489 startup_args.write(ThreadStartupArgs {
490 callback: f,
491 name: self.name,
492 _data: PhantomData,
493 });
494 }
495
496 let mut handle = posix::pthread_t::new_zeroed();
497
498 let msg = "Unable to create thread";
499 match unsafe {
500 posix::pthread_create(
501 &mut handle,
502 attributes.get(),
503 start_routine::<F, T>,
504 startup_args as *mut posix::void,
505 )
506 .into()
507 } {
508 Errno::ESUCCES => (),
509 Errno::EAGAIN => {
510 fail!(from self, with ThreadSpawnError::InsufficientResources,
511 "{} due to insufficient resources. Maybe the system limit of threads is reached.", msg);
512 }
513 Errno::EINVAL => {
514 fail!(from self, with ThreadSpawnError::InvalidSettings,
515 "{} due to invalid settings for the thread.", msg);
516 }
517 Errno::EPERM => {
518 fail!(from self, with ThreadSpawnError::InsufficientPermissions,
519 "{} due to insufficient permissions to set scheduling settings.", msg);
520 }
521 v => {
522 fail!(from self, with ThreadSpawnError::UnknownError(v as i32),
523 "{} since an unknown error occurred ({}).", msg,v);
524 }
525 };
526
527 Ok(Thread::new(ThreadHandle {
528 handle,
529 name: UnsafeCell::new(self.name),
530 }))
531 }
532}
533
534pub struct ThreadGuardedStackBuilder {
543 config: ThreadBuilder,
544}
545
546impl ThreadGuardedStackBuilder {
547 pub fn guard_size(mut self, value: u64) -> Self {
549 self.config.guard_size = value;
550 self
551 }
552
553 pub fn stack_size(mut self, value: u64) -> Self {
555 self.config.stack_size = Some(value);
556 self
557 }
558
559 pub fn spawn<'thread, T, F>(self, f: F) -> Result<Thread, ThreadSpawnError>
561 where
562 T: Debug + Send + 'thread,
563 F: FnOnce() -> T + Send + 'thread,
564 {
565 self.config.spawn_impl(f)
566 }
567}
568
569pub trait ThreadProperties {
570 fn get_name(&self) -> Result<&ThreadName, ThreadGetNameError>;
572
573 fn get_affinity(&self) -> Result<Vec<usize>, ThreadSetAffinityError>;
576
577 fn set_affinity(&mut self, cpu_core_ids: &[usize]) -> Result<(), ThreadSetAffinityError>;
580}
581
582#[derive(Debug)]
600pub struct ThreadHandle {
601 handle: posix::pthread_t,
602 name: UnsafeCell<ThreadName>,
603}
604
605impl ThreadHandle {
606 pub fn from_self() -> ThreadHandle {
608 ThreadHandle {
609 handle: unsafe { posix::pthread_self() },
610 name: UnsafeCell::new(ThreadName::new()),
611 }
612 }
613}
614
615impl ThreadProperties for ThreadHandle {
616 fn get_name(&self) -> Result<&ThreadName, ThreadGetNameError> {
617 if !unsafe { self.name.get().as_ref().unwrap() }.is_empty() {
618 return Ok(unsafe { self.name.get().as_ref().unwrap() });
619 }
620
621 let mut name: [posix::c_char; MAX_THREAD_NAME_LENGTH] = [0; MAX_THREAD_NAME_LENGTH];
622
623 let msg = "Unable to acquire thread name";
624 match unsafe {
625 posix::pthread_getname_np(self.handle, name.as_mut_ptr(), MAX_THREAD_NAME_LENGTH)
626 }
627 .into()
628 {
629 Errno::ESUCCES => {
630 unsafe {
631 let raw_string = fail!(from self, when ThreadName::from_c_str(name.as_mut_ptr()),
632 with ThreadGetNameError::ThreadNameLongerThanMaxSupportedSize,
633 "{} since it require more characters than the maximum supported length of {}.",
634 msg, ThreadName::capacity());
635
636 *self.name.get() = raw_string
637 };
638 Ok(unsafe { self.name.get().as_ref().unwrap() })
639 }
640 Errno::ERANGE => {
641 fatal_panic!(from self, "{} since the provided buffer is too small. Increase MAX_THREAD_NAME_LENGTH for this platform.", msg);
642 }
643 v => {
644 fail!(from self, with ThreadGetNameError::UnknownError(v as i32),
645 "{} since an unknown error has occurred ({}).", msg, v);
646 }
647 }
648 }
649
650 fn get_affinity(&self) -> Result<Vec<usize>, ThreadSetAffinityError> {
651 let mut cpuset = posix::cpu_set_t::new_zeroed();
652 let msg = "Unable to acquire threads CPU affinity";
653 handle_errno!(ThreadSetAffinityError, from self,
654 errno_source unsafe { posix::pthread_getaffinity_np(self.handle, core::mem::size_of::<posix::cpu_set_t>(), &mut cpuset).into()},
655 continue_on_success,
656 success Errno::ESUCCES => (),
657 Errno::EINVAL => (InvalidCpuCores, "{} since some cpu cores were invalid (maybe exceeded maximum supported CPU core number of the system).", msg ),
658 v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
659 );
660
661 let mut cpu_affinity_set = vec![];
662 for i in 0..posix::CPU_SETSIZE {
663 if cpuset.has(i) {
664 cpu_affinity_set.push(i);
665 }
666 }
667
668 Ok(cpu_affinity_set)
669 }
670
671 fn set_affinity(&mut self, cpu_core_ids: &[usize]) -> Result<(), ThreadSetAffinityError> {
672 let msg = "Unable to set cpu affinity to core";
673 let number_of_cores = SystemInfo::NumberOfCpuCores.value();
674
675 let mut cpuset = posix::cpu_set_t::new_zeroed();
676 for cpu_core_id in cpu_core_ids {
677 if *cpu_core_id >= posix::CPU_SETSIZE {
678 fail!(from self, with ThreadSetAffinityError::InvalidCpuCores,
679 "{} {} since it exceeds the capacity of the thread affinity mask of {}.",
680 msg, cpu_core_id, posix::CPU_SETSIZE);
681 }
682
683 if *cpu_core_id > number_of_cores {
684 fail!(from self, with ThreadSetAffinityError::InvalidCpuCores,
685 "{} {} since the maximum range of CPUs in the system is [0, {}].",
686 msg, cpu_core_id, number_of_cores - 1);
687 }
688
689 cpuset.set(*cpu_core_id);
690 }
691
692 let msg = "Unable to set cpu affinity";
693 handle_errno!(ThreadSetAffinityError, from self,
694 errno_source unsafe { posix::pthread_setaffinity_np(self.handle, core::mem::size_of::<posix::cpu_set_t>(), &cpuset).into() },
695 success Errno::ESUCCES => (),
696 Errno::EINVAL => (InvalidCpuCores, "{} since some cpu cores were invalid (maybe exceeded maximum supported CPU core number of the system).", msg),
697 v=> (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
698 );
699 }
700}
701struct ThreadStartupArgs<'thread, T: Send + Debug + 'thread, F: FnOnce() -> T + Send + 'thread> {
702 callback: F,
703 name: ThreadName,
704 _data: PhantomData<&'thread ()>,
705}
706
707pub struct Thread {
737 handle: ThreadHandle,
738}
739
740impl Debug for Thread {
741 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
742 write!(f, "Thread {{ handle: {:?} }}", self.handle)
743 }
744}
745
746impl Drop for Thread {
747 fn drop(&mut self) {
748 let msg = "Unable to join thread";
749 match unsafe {
750 posix::pthread_join(
751 self.handle.handle,
752 core::ptr::null_mut::<*mut posix::void>(),
753 )
754 .into()
755 } {
756 Errno::ESUCCES => (),
757 Errno::EDEADLK => {
758 fatal_panic!(from self, "{} since a deadlock was detected.", msg);
759 }
760 Errno::EINVAL => {
761 fatal_panic!(from self, "{} since someone else is already trying to join this thread.", msg);
762 }
763 Errno::ESRCH => {
764 fatal_panic!(from self, "This should never happen! Unable to join thread since its handle is invalid.");
765 }
766 v => {
767 fatal_panic!(from self, "{} since an unknown error occurred ({}).", msg, v);
768 }
769 }
770 }
771}
772
773impl Thread {
774 fn new(handle: ThreadHandle) -> Self {
775 Self { handle }
776 }
777
778 pub fn send_signal(&mut self, signal: Signal) -> Result<(), ThreadSignalError> {
780 let msg = "Unable to send signal";
781 handle_errno!(ThreadSignalError, from self,
782 errno_source unsafe { posix::pthread_kill(self.handle.handle, signal as i32)}.into(),
783 success Errno::ESUCCES => (),
784 Errno::ESRCH => (ThreadNoLongerActive, "{} {:?} since the thread is no longer active.", msg, signal),
785 v => (UnknownError(v as i32), "{} {:?} since an unknown error occurred ({})", msg, signal, v)
786 );
787 }
788}
789
790impl ThreadProperties for Thread {
791 fn get_name(&self) -> Result<&ThreadName, ThreadGetNameError> {
792 self.handle.get_name()
793 }
794
795 fn get_affinity(&self) -> Result<Vec<usize>, ThreadSetAffinityError> {
796 self.handle.get_affinity()
797 }
798
799 fn set_affinity(&mut self, cpu_core_ids: &[usize]) -> Result<(), ThreadSetAffinityError> {
800 self.handle.set_affinity(cpu_core_ids)
801 }
802}