timer_deque_rs/periodic_task/
sync_tasks.rs

1/*-
2 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
6 *  4neko.org alex@4neko.org
7 * 
8 * The timer-rs crate can be redistributed and/or modified
9 * under the terms of either of the following licenses:
10 *
11 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
12 *                     
13 *   2. The MIT License (MIT)
14 *                     
15 *   3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
16 */
17
18use std::
19{
20    cell::OnceCell, 
21    cmp, 
22    collections::HashMap, 
23    fmt, 
24    num::NonZeroUsize, 
25    os::fd::{AsRawFd, RawFd}, 
26    sync::
27    {
28        Arc, Mutex, TryLockError, Weak, atomic::{AtomicBool, Ordering}, mpsc::{self}
29    }, 
30    thread::JoinHandle, 
31    time::Duration
32};
33
34use crossbeam_deque::{Injector, Steal};
35use rand::random_range;
36
37use crate::
38{
39    AbsoluteTime, 
40    FdTimerCom, 
41    RelativeTime, 
42    TimerPoll, 
43    TimerReadRes, 
44    error::{TimerError, TimerErrorType, TimerResult}, 
45    map_timer_err, timer_err, 
46    timer_portable::
47    {
48        PollEventType, PolledTimerFd, TimerExpMode, TimerFlags, TimerType, poll::PollInterrupt, timer::TimerFd
49    }
50};
51
52/// A result of the task execution which is returned by the user task.
53/// See description for the each variant.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum PeriodicTaskResult
56{
57    /// The task exec was ok. No errors.
58    Ok,
59
60    /// Cancels (but not removing) the task due to error or for other reason.
61    /// By returning this result the timer is unset to stop generating events. 
62    /// But, if since the `AbortTask` was received and a timer has timed out again 
63    /// the event will be called again. It is not necessary to return the `AbortTask` again, 
64    /// just return [PeriodicTaskResult::Ok] exiting imidiatly.
65    CancelTask,
66
67    /// A request to reschedule the task to a new time or interval. The `0` argument 
68    /// should contain new time. The timer mode can be changed.
69    TaskReSchedule(PeriodicTaskTime)
70}
71
72/// A trait which should be implemented by the instance which will be added as a `task`.
73/// 
74/// * the `instance` must implement [Send] because it may be moved to other thread.
75/// 
76/// * it is not necessary that the `instance` to impl [Sync] because the `instance` will never
77/// be executed from two different threads and it is already protected by mutex.
78/// 
79/// A `task` should never block the thread i.e call some functions which may block for a large
80/// period of time! 
81pub trait PeriodicTask: Send + fmt::Debug + 'static
82{
83    /// A task entry point. The task should return a [PeriodicTaskResult].
84    fn exec(&mut self) -> PeriodicTaskResult;
85}
86
87/// An alias for the boxed value.
88pub type PeriodicTaskHndl = Box<dyn PeriodicTask>;
89
90
91/// A global FIFO commands.
92/// 
93/// For all variants, the option [Option] [mpsc::Sender] is used for the error feedback and if
94/// it is set to [Option::None] the error will not be reported.
95#[derive(Debug)]
96pub(crate) enum GlobalTasks
97{
98    /// Adds task to the task system. If error happens it will be reported over the `1` and item `0`
99    /// will not be added to the instance.
100    AddTask( String, PeriodicTaskTime, Arc<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
101
102    /// Removes the task from the system.
103    RemoveTask( Arc<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
104
105    /// Changes the timer value.
106    ReschedTask( Weak<PeriodicTaskGuardInner>, PeriodicTaskTime, Option<mpsc::Sender<TimerResult<()>>> ),
107
108    /// Suspends the task but does not remove the task. The task which is planned to suspend, if it is already
109    /// in the exec queue, will be executed.
110    SuspendTask( Weak<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
111
112    /// Resumes the task by enabling timer.
113    ResumeTask( Weak<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
114}
115
116/// A local task queue commands.
117#[derive(Debug)]
118pub enum ThreadTask
119{
120    /// Execute the task.
121    TaskExec( Arc<NewTaskTicket> )
122}
123
124/// A ticket which is assigned to specific task in order to send the same task to the same
125/// thread which is already assigned. If only one thread is allocated, then this is not
126/// really necessary (will be optimized in future).
127#[derive(Debug)]
128pub struct NewTaskTicket
129{
130    /// A thread handler of the specific thread.
131    task_thread: Arc<ThreadHandler>,
132    ptgi: Weak<PeriodicTaskGuardInner>,
133}
134
135impl NewTaskTicket
136{
137    fn new(task_thread: Arc<ThreadHandler>, ptgi: Weak<PeriodicTaskGuardInner>) -> Self
138    {
139        return 
140            Self
141            {
142                task_thread: 
143                    task_thread,
144                ptgi:
145                    ptgi
146            };
147    }
148
149    /// Sends the task on exec to the local queue of the thread.
150    fn send_task(this: Arc<NewTaskTicket>, task_rep_count: u64, thread_hndl_cnt: usize)
151    {
152        let strong_cnt = Arc::strong_count(&this);
153        let thread = this.task_thread.clone();
154
155        for _ in 0..task_rep_count
156        {
157            thread.send_task(this.clone(), strong_cnt < 2 && thread_hndl_cnt > 1);
158        }
159    }
160}
161
162/// The internal structure which is stored in pair with the timer's FD. Contains necessary references
163/// and values.
164#[derive(Debug)]
165pub(crate) struct PeriodicTaskTicket
166{
167    /// A task name.
168    task_name: String,
169
170    sync_timer: PolledTimerFd<TimerFd>,
171
172    /// A time which was set to timer. Needed in case if timer was cancelled ny OS or suspended.
173    ptt: PeriodicTaskTime,
174
175    /// A [Weak] reference to the current [NewTaskTicket] which identifies to which thread the
176    /// task exec was assigned. The [Arc] of [NewTaskTicket] is clonned for each task exec round.
177    /// If is not `upgradable` the task is no logner assigned to thread.
178    weak_ticket: Weak<NewTaskTicket>,
179
180    /// A [Weak] reference to the [PeriodicTaskGuardInner]. If this reference is not `upgradable` 
181    /// then the task is no logner valid.
182    ptg: Weak<PeriodicTaskGuardInner>,
183}
184
185impl Ord for PeriodicTaskTicket 
186{
187    fn cmp(&self, other: &Self) -> cmp::Ordering 
188    {
189        return 
190            self
191                .sync_timer
192                .get_inner()
193                .as_raw_fd()
194                .cmp(&other.sync_timer.get_inner().as_raw_fd());
195    }
196}
197
198impl PartialOrd for PeriodicTaskTicket 
199{
200    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> 
201    {
202        return Some(self.cmp(other));
203    }
204}
205
206impl PartialEq for PeriodicTaskTicket 
207{
208    fn eq(&self, other: &Self) -> bool 
209    {
210        return 
211            self.task_name == other.task_name && 
212            self.sync_timer.get_inner().as_raw_fd() == other.sync_timer.get_inner().as_raw_fd();
213    }
214}
215
216impl Eq for PeriodicTaskTicket {}
217
218impl PeriodicTaskTicket
219{
220    fn new(task_name: String, ptt: PeriodicTaskTime, ptg: Arc<PeriodicTaskGuardInner>, poll: &TimerPoll) -> TimerResult<Self>
221    {
222        // create timer and add to poll
223        let sync_timer = 
224            TimerFd::new(task_name.clone().into(), TimerType::CLOCK_REALTIME, 
225                TimerFlags::TFD_CLOEXEC | TimerFlags::TFD_NONBLOCK)
226                .map_err(|e|
227                    map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot setup timer for task: '{}'", task_name)
228                )
229                .and_then(|timer| 
230                    poll.add(timer)
231                )?;
232
233        let ptt = 
234            Self
235            {
236                task_name: task_name,
237                sync_timer: sync_timer,
238                ptt: ptt,
239                weak_ticket: Weak::new(),
240                ptg: Arc::downgrade(&ptg),
241            };
242
243        ptt.set_timer()?;
244
245        return Ok(ptt);
246    }
247
248    #[inline]
249    fn set_timer(&self) -> TimerResult<()>
250    {
251        return self.get_timer_time().set_timer(self.sync_timer.get_inner());
252    }
253
254    #[inline]
255    fn unset_timer(&self) -> TimerResult<()>
256    {
257        return 
258            self
259                .sync_timer
260                .get_inner()
261                .get_timer()
262                .unset_time()
263                .map_err(|e|
264                    map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "unsetting timer '{}' returned error: {}", self.sync_timer, e)
265                );
266    }
267
268    fn update_task_time(&mut self, ptt_new: PeriodicTaskTime) -> TimerResult<()>
269    {
270        self.ptt = ptt_new;
271
272        return self.set_timer();
273    } 
274
275    fn get_task_guard(&self) -> TimerResult<Arc<PeriodicTaskGuardInner>>
276    {
277        return 
278            self
279                .ptg
280                .upgrade()
281                .ok_or_else(||
282                    map_timer_err!(TimerErrorType::ReferenceGone, "task: '{}' reference to timer has gone", 
283                        self.task_name)
284                );
285    }
286
287    #[inline]
288    fn get_timer_time(&self) -> &PeriodicTaskTime
289    {
290        return &self.ptt;
291    }
292}
293
294/// An instance which is returned by the [SyncPeriodicTasks::add] whuch guards the 
295/// task and allows to control its state. If task is no longer needed the instance
296/// can be dropped and it will be removed from the system.
297#[derive(Debug)]
298pub struct PeriodicTaskGuard
299{
300    /// A task name
301    task_name: String,
302
303    /// A [Arc] to [PeriodicTaskGuardInner] which contains timer and other things. 
304    /// Normally this is the base instance i.e reference and if it is dropped
305    /// it indicates to the task `executor` that the instance is no logner valid.
306    /// The autoremoval is perfomed via sending the `command` over the global
307    /// qeueu.
308    /// 
309    /// The [Option] is needed to `take` the instance.
310    guard: Option<Arc<PeriodicTaskGuardInner>>,
311
312    /// A clonned instance to the executor spawner which contains the reference 
313    /// to channel.
314    spt: Arc<SyncPeriodicTasksInner>
315}
316
317impl Drop for PeriodicTaskGuard
318{
319    fn drop(&mut self) 
320    {
321        let guard = self.guard.take().unwrap();
322
323        let _ = self.spt.send_global_cmd(GlobalTasks::RemoveTask(guard, None));
324
325        return;
326    }
327}
328
329impl PeriodicTaskGuard
330{
331    /// Requests the task rescheduling - changine the timer time or mode. 
332    /// The `task` is represented by the calling instance. 
333    /// 
334    /// This function blocks
335    /// the current thread for the maximum (in worst case) 5 seconds which is a 
336    /// timeout for feedback reception.
337    /// 
338    /// # Arguments
339    /// 
340    /// * `ptt` - a new time to be set to timer.
341    /// 
342    /// # Returns 
343    /// 
344    /// A [Result] as alias [TimerResult] is returned.
345    
346    /// In case if error is retuned, the operation should be considered as not completed
347    /// correctly and the executor instance is poisoned i.e a bug happened.
348    /// 
349    /// The common errors may be retuned:
350    /// 
351    /// * [TimerErrorType::MpscTimeout] - a feedback (with the result) reception timeout.
352    ///     Probably this is because the task executor is too busy.
353    /// 
354    /// * [TimerErrorType::TimerError] - with the timer error.
355    /// 
356    /// * [TimerErrorType::NotFound] - if instance was not found in the internal records. 
357    ///     Should not happen. If appears, then probably this is a bug.
358    pub 
359    fn reschedule_task(&self, ptt: PeriodicTaskTime) -> TimerResult<()>
360    {
361        let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
362
363        let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
364
365        self.spt.send_global_cmd(GlobalTasks::ReschedTask(weak_ptgi, ptt, Some(snd)))?;
366
367        return 
368            rcv
369                .recv_timeout(Duration::from_secs(10))
370                .map_err(|e|
371                    map_timer_err!(TimerErrorType::MpscTimeout, "reschedule_task(), task name: '{}', MPSC rcv timeout error: '{}'", 
372                        self.task_name, e)
373                )?;
374    }
375
376    /// Requests to suspent the current `task`.
377    /// 
378    /// This function blocks
379    /// the current thread for the maximum (in worst case) 5 seconds which is a 
380    /// timeout for feedback reception.
381    /// 
382    /// # Returns 
383    /// 
384    /// A [Result] as alias [TimerResult] is returned.
385    /// 
386    /// In case if error is retuned, the operation should be considered as not completed
387    /// correctly and the executor instance is poisoned i.e a bug happened.
388    /// 
389    /// The common errors may be retuned:
390    /// 
391    /// * [TimerErrorType::MpscTimeout] - a feedback (with the result) reception timeout.
392    ///     Probably this is because the task executor is too busy.
393    /// 
394    /// * [TimerErrorType::TimerError] - with the timer error.
395    pub 
396    fn suspend_task(&self) -> TimerResult<()>
397    {
398        let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
399
400        let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
401
402        self.spt.send_global_cmd(GlobalTasks::SuspendTask(weak_ptgi, Some(snd)))?;
403
404        return 
405            rcv
406                .recv_timeout(Duration::from_secs(10))
407                .map_err(|e|
408                    map_timer_err!(TimerErrorType::MpscTimeout, "suspend_task(), task name: '{}', MPSC rcv timeout error: '{}'", 
409                        self.task_name, e)
410                )?;
411    }
412
413    /// Requests to resume the task from the suspend state.
414    /// 
415    /// This function blocks
416    /// the current thread for the maximum (in worst case) 5 seconds which is a 
417    /// timeout for feedback reception.
418    /// 
419    /// # Returns 
420    /// 
421    /// A [Result] as alias [TimerResult] is returned.
422    /// 
423    /// In case if error is retuned, the operation should be considered as not completed
424    /// correctly and the executor instance is poisoned i.e a bug happened.
425    /// 
426    /// The common errors may be retuned:
427    /// 
428    /// * [TimerErrorType::MpscTimeout] - a feedback (with the result) reception timeout.
429    ///     Probably this is because the task executor is too busy.
430    /// 
431    /// * [TimerErrorType::TimerError] - with the timer error.
432    pub 
433    fn resume_task(&self) -> TimerResult<()>
434    {
435        let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
436
437        let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
438
439        self.spt.send_global_cmd(GlobalTasks::ResumeTask(weak_ptgi, Some(snd)))?;
440
441        return 
442            rcv
443                .recv_timeout(Duration::from_secs(10))
444                .map_err(|e|
445                    map_timer_err!(TimerErrorType::MpscTimeout, "resume_task(), task name: '{}', MPSC rcv timeout error: '{}'", 
446                        self.task_name, e)
447                )?;
448    }
449}
450
451/// Programs the taks's timer to specific time and mode. This instance is
452/// a wrapper around the [TimerExpMode] as this struct requers the generic to
453/// be specified which defines the type of the time.
454#[derive(Debug, Clone, Copy, PartialEq, Eq)]
455pub enum PeriodicTaskTime
456{
457    /// A provided time is absolute time in future.
458    Absolute(TimerExpMode<AbsoluteTime>),
459
460    /// A provided time is relative to current time.
461    Relative(TimerExpMode<RelativeTime>),
462}
463
464impl From<TimerExpMode<AbsoluteTime>> for PeriodicTaskTime
465{
466    fn from(value: TimerExpMode<AbsoluteTime>) -> Self 
467    {
468        return Self::Absolute(value);
469    }
470}
471
472impl From<TimerExpMode<RelativeTime>> for PeriodicTaskTime
473{
474    fn from(value: TimerExpMode<RelativeTime>) -> Self 
475    {
476        return Self::Relative(value);
477    }
478}
479
480impl fmt::Display for PeriodicTaskTime
481{
482    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
483    {
484        match self
485        {
486            Self::Absolute(t) => 
487                write!(f, "{}", t),
488            Self::Relative(t) => 
489                write!(f, "{}", t),
490        }
491    }
492}
493
494impl PeriodicTaskTime
495{
496    /// The timer is set to time up once and for the specific `absolute` time in future 
497    /// (to current realclock time).
498    #[inline]
499    pub 
500    fn exact_time(abs_time: AbsoluteTime) -> Self
501    {
502        return Self::Absolute(TimerExpMode::<AbsoluteTime>::new_oneshot(abs_time));
503    }
504
505    /// The timer is set to time up in the interval for the `relative` time.
506    #[inline]
507    pub 
508    fn interval(rel_time: RelativeTime) -> Self
509    {
510        return Self::Relative(TimerExpMode::<RelativeTime>::new_interval(rel_time));
511    }
512
513    /// The timer is set to time up in the interval for the `relative` time with the initial
514    /// delay.
515    /// 
516    /// # Arguments
517    /// 
518    /// * `start_del_time` - a [RelativeTime] of the initial delay.
519    /// 
520    /// * `rel_int_time` - a [RelativeTime] of the interval.
521    #[inline]
522    pub 
523    fn interval_with_start_delay(start_del_time: RelativeTime, rel_int_time: RelativeTime) -> Self
524    {
525        return Self::Relative(TimerExpMode::<RelativeTime>::new_interval_with_init_delay(start_del_time, rel_int_time));
526    }
527
528    /// Sets the `timer_fd` instance to the specific value stored in the current instance.
529    fn set_timer(&self, timer_fd: &TimerFd) -> TimerResult<()>
530    {
531        match *self
532        {
533            Self::Absolute(timer_exp_mode) => 
534                return 
535                    timer_fd
536                        .get_timer()
537                        .set_time(timer_exp_mode)
538                        .map_err(|e|
539                            map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot set time '{}' for timer: '{}'", timer_exp_mode, timer_fd )
540                        ),
541            Self::Relative(timer_exp_mode) => 
542                return 
543                    timer_fd
544                        .get_timer()
545                        .set_time(timer_exp_mode)
546                        .map_err(|e|
547                            map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot set time '{}' for timer: '{}'", timer_exp_mode, timer_fd )
548                        ),
549        }
550    }
551}
552
553
554/// A instane which contains the task's timer and a task.
555#[derive(Debug)]
556pub(crate) struct PeriodicTaskGuardInner
557{
558    task_name: String,
559
560    /// A task itself. At the moment it is mutexed in order to provide
561    /// the mutability, because the current instance is wrapped into [Arc].
562    /// But, the task is never executed from different threads and mutex
563    /// should be replaced with something that is [Send] and can provide
564    /// mutable reference.
565    task: Mutex<PeriodicTaskHndl>,
566}
567
568impl fmt::Display for PeriodicTaskGuardInner
569{
570    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
571    {
572        write!(f, "task name: '{}'", self.task_name)
573    }
574}
575
576impl PeriodicTaskGuardInner
577{
578    fn new(task_name: String, task_inst: PeriodicTaskHndl) -> TimerResult<Self>
579    {
580        return Ok(Self {  task_name: task_name, task: Mutex::new(task_inst) });
581    }
582
583
584}
585
586/// Internal structure for every worker thread.
587struct ThreadWorker
588{
589    /// A thread name, not task name.
590    thread_name: String,
591
592    /// An [Injector] queue for the global tasks received from other threads.
593    global_task_injector: Arc<Injector<GlobalTasks>>,
594
595    /// An [Injector] queue for local (specific to current thread) tasks.
596    local_thread_inj: Arc<Injector<ThreadTask>>,
597
598    /// A flag which is used to control the thread's loop. Is set to `false`
599    /// when thread should exit.
600    thread_run_flag: Arc<AtomicBool>,
601
602    /// A shared instance of the [SyncPeriodicTasksInner] which holds all information
603    /// about tasks and FD polling. The thread which aquired the mutex first will
604    /// process the `global_task_injector` queue and poll the timers for the events which
605    /// will be shared over with other threads.
606    spti: Arc<Mutex<SharedPeriodicTasks>>,
607
608    /// A last thread to which the task was assigned.
609    thread_last_id: usize,
610
611    /// A poll interrupt signal sender.
612    poll_int: PollInterrupt
613}
614
615impl ThreadWorker
616{
617    fn new(thread_name: String, global_task_injector: Arc<Injector<GlobalTasks>>, 
618        spti: Arc<Mutex<SharedPeriodicTasks>>, poll_int: PollInterrupt) -> TimerResult<ThreadHandler>
619    {
620        let local_thread_inj = Arc::new(Injector::<ThreadTask>::new());
621        let thread_run_flag = Arc::new(AtomicBool::new(true));
622        let thread_run_flag_weak = Arc::downgrade(&thread_run_flag);
623
624        let worker = 
625            ThreadWorker
626            {
627                thread_name: 
628                    thread_name.clone(),
629                global_task_injector: 
630                    global_task_injector,
631                local_thread_inj:
632                    local_thread_inj.clone(),
633                thread_run_flag: 
634                    thread_run_flag,
635                spti:
636                    spti,
637                thread_last_id:
638                    0,
639                poll_int:
640                    poll_int
641            };
642
643        let thread_hndl =
644            std::thread::Builder::new()
645                .name(thread_name)
646                .spawn(|| worker.worker())
647                .map_err(|e|
648                    map_timer_err!(TimerErrorType::SpawnError(e.kind()), "{}", e)
649                )?;
650            
651
652        return Ok( ThreadHandler::new(thread_hndl, local_thread_inj, thread_run_flag_weak) );
653    }
654
655
656    fn worker(mut self) -> TimerResult<()>
657    {
658        // force to park to allow the main thread to initialize everything
659        std::thread::park();
660
661        while self.thread_run_flag.load(Ordering::Acquire) == true
662        {
663            // check local queue for the task or token
664            while let Steal::Success(task) = self.local_thread_inj.steal()
665            {
666                match task
667                {
668                    ThreadTask::TaskExec(task_exec) =>
669                    {
670                        let Some(ptgi) = task_exec.ptgi.upgrade()
671                            else
672                            {   
673                                // task was removed while was in queue.
674                                continue;
675                            };
676
677                        // call task
678                        match ptgi.task.lock().unwrap().exec()
679                        {
680                            PeriodicTaskResult::Ok => 
681                                {},
682                            PeriodicTaskResult::CancelTask => 
683                            {
684                                self
685                                    .global_task_injector
686                                    .push(GlobalTasks::SuspendTask(task_exec.ptgi.clone(), None));
687
688                                let _ = self.poll_int.aquire().map(|v| v.interrupt_drop());
689                            },
690                            PeriodicTaskResult::TaskReSchedule(ptt) => 
691                            {
692                                self
693                                    .global_task_injector
694                                    .push(GlobalTasks::ReschedTask(task_exec.ptgi.clone(), ptt, None));
695
696                                let _ = self.poll_int.aquire().map(|v| v.interrupt_drop());
697                            }
698                        }
699
700                        drop(task_exec);
701                    }
702                }
703            }
704
705            let spti_lock_res =  self.spti.try_lock();
706
707            if let Ok(mut task_token) = spti_lock_res
708            {
709                let thread_hndl_cnt = task_token.thread_pool.get().unwrap().len();
710
711                // check global task queue
712                while let Steal::Success(task) = self.global_task_injector.steal()
713                {
714                    match task
715                    {
716                        GlobalTasks::AddTask(task_name, ptt, ptg, opt_err_ret) => 
717                        {
718                            if task_token.contains_task(&task_name) == true
719                            {
720                                if let Some(err_ret) = opt_err_ret
721                                {
722                                    let err_msg = 
723                                        map_timer_err!(TimerErrorType::Duplicate, 
724                                            "thread: '{}', task: '{}' already exists", self.thread_name, task_name);
725
726                                    let _ = err_ret.send(Err(err_msg));
727                                }
728
729                                continue;
730                            }
731
732                            let period_task_ticket = 
733                                PeriodicTaskTicket::new(task_name.clone(), ptt, ptg, &task_token.timers_poll);
734
735                            if let Err(e) = period_task_ticket 
736                            {
737                                if let Some(err_ret) = opt_err_ret
738                                {
739                                    let _ = err_ret.send(Err(e));
740                                }
741
742                                continue;
743                            }
744
745                            let period_task_ticket = period_task_ticket.unwrap();
746                            
747                            task_token.insert_task(period_task_ticket);
748                            
749                            if let Some(err_ret) = opt_err_ret
750                            {
751                                let _ = err_ret.send(Ok(()));
752                            }
753                        },
754                        GlobalTasks::RemoveTask(ptg_arc, opt_err_ret) => 
755                        {
756                            let res = task_token.remove_task(&ptg_arc.task_name);
757
758                            if let Err(e) = res
759                            {
760                                if let Some(err_ret) = opt_err_ret
761                                {
762                                    let err_msg = 
763                                        map_timer_err!(e.get_error_type(), 
764                                            "thread: '{}', {}", self.thread_name, e.get_error_msg());
765
766                                    let _ = err_ret.send(Err(err_msg));
767                                }
768
769                                continue;
770                            }
771
772                            let ptt_ref = res.unwrap();
773
774                            //let ptt = ptt_ref.into_inner();
775
776                            // stop timer
777                            if let Err(e) = ptt_ref.unset_timer()
778                            {
779                                if let Some(err_ret) = opt_err_ret.as_ref()
780                                {
781                                    let _ = err_ret.send(Err(e));
782                                }
783
784                                continue;
785                            }
786
787                            drop(ptt_ref);
788
789                            /*if let Some(err_ret) = opt_err_ret
790                            {
791                                let _ = err_ret.send(Ok(()));
792                            }
793
794                            if let Some(ptt_ref) = task_token.tasks.remove(&ptg_arc.task_name)
795                            {
796                                
797                            }
798                            else
799                            {
800                                if let Some(err_ret) = opt_err_ret
801                                {
802                                    let err_msg = 
803                                        map_timer_err!(TimerErrorType::NotFound, 
804                                            "thread: '{}', task: '{}' was not found", self.thread_name, ptg_arc.task_name);
805
806                                    let _ = err_ret.send(Err(err_msg));
807                                }
808                            }*/
809
810                            drop(ptg_arc);
811                        },
812                        GlobalTasks::ReschedTask( ptg_weak, ptt, opt_err_ret ) =>
813                        {
814                            let Some(ptg_arc) = ptg_weak.upgrade()
815                                else
816                                {   
817                                    // task was removed while was in queue.
818                                    continue;
819                                };
820
821                            // replace the time
822                            /*let res_task = 
823                                task_token
824                                    .tasks
825                                    .get(&ptg_arc.task_name)
826                                    .ok_or_else(||
827                                        map_timer_err!(TimerErrorType::NotFound, 
828                                            "thread: '{}', task: '{}' was not found", self.thread_name, ptg_arc.task_name)
829                                    );*/
830
831                            let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
832
833                            let res = 
834                                match res_task
835                                {
836                                    Ok(task) => 
837                                    {
838                                        // stop timer
839                                        let _ = task.unset_timer();
840
841                                        // update timer value and set timer
842                                        let res = task.update_task_time(ptt);
843
844                                        if let Err(e) = res
845                                        {
846                                            if let Some(err_ret) = opt_err_ret.as_ref()
847                                            {
848                                                let _ = err_ret.send(Err(e));
849                                            }
850
851                                            continue;
852                                        }
853
854                                        Ok(())
855                                    },
856                                    Err(err) => 
857                                    {
858                                        Err(err)
859                                    }
860                                };
861
862                            if let Some(err_ret) = opt_err_ret
863                            {
864                                let _ = err_ret.send(res);
865                            }
866                        },
867                        GlobalTasks::SuspendTask(ptg_weak, opt_err_ret) =>
868                        {
869                            let Some(ptg_arc) = ptg_weak.upgrade()
870                                else
871                                {   
872                                    // task was removed while was in queue.
873                                    continue;
874                                };
875
876                            /*let res_task = 
877                                task_token
878                                    .tasks
879                                    .get(&ptg_arc.task_name)
880                                    .ok_or_else(||
881                                        map_timer_err!(TimerErrorType::NotFound, 
882                                            "thread: '{}', task: '{}' was not found", self.thread_name, ptg_arc.task_name)
883                                    );*/
884
885                            let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
886
887                            if let Err(e) = res_task
888                            {
889                                if let Some(err_ret) = opt_err_ret.as_ref()
890                                {
891                                    let _ = err_ret.send(Err(e));
892                                }
893
894                                continue;
895                            }
896
897                            let task = res_task.unwrap();
898
899                            let res = task.unset_timer();
900
901                            if let Some(err_ret) = opt_err_ret
902                            {
903                                let _ = err_ret.send(res);
904                            }
905                        },
906                        GlobalTasks::ResumeTask(ptg_weak, opt_err_ret) =>
907                        {
908                            let Some(ptg_arc) = ptg_weak.upgrade()
909                                else
910                                {   
911                                    // task was removed while was in queue.
912                                    continue;
913                                };
914
915                            /*let res_task = 
916                                task_token
917                                    .tasks
918                                    .get(&ptg_arc.task_name)
919                                    .ok_or_else(||
920                                        map_timer_err!(TimerErrorType::NotFound, 
921                                            "thread: '{}', timer with FD: '{}' was not found", self.thread_name, ptg_arc.task_name)
922                                    );*/
923
924                            let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
925
926                            if let Err(e) = res_task
927                            {
928                                if let Some(err_ret) = opt_err_ret.as_ref()
929                                {
930                                    let _ = err_ret.send(Err(e));
931                                }
932
933                                continue;
934                            }
935
936                            let res = res_task.unwrap().set_timer();
937
938                            if let Some(err_ret) = opt_err_ret.as_ref()
939                            {
940                                let _ = err_ret.send(res);
941                            }
942                        }
943                    }
944                }
945
946                // poll
947                let Some(res) = 
948                    task_token
949                        .timers_poll
950                        .poll(Some(5000))?
951                else
952                    {
953                        continue;
954                    };
955
956
957                for event in res
958                {
959                    match event
960                    {
961                        PollEventType::TimerRes(timer_fd, timer_res) => 
962                        {
963                            // resolve fd into task
964                            /*let task = 
965                                task_token
966                                    .tasks
967                                    .get(&timer_fd)
968                                    .ok_or_else(||
969                                        map_timer_err!(TimerErrorType::NotFound, 
970                                            "thread: '{}', timer with FD: '{}' was not found", self.thread_name, timer_fd)
971                                    )?;*/
972                            
973                            let (ptg, ptt, ticket_arc_opt, task_name) = 
974                            {
975                                let task = 
976                                    task_token
977                                        .get_task_by_fd(timer_fd)
978                                        .map_err(|e|
979                                            map_timer_err!(e.get_error_type(), "thread: '{}', {}", self.thread_name, e.get_error_msg())
980                                        )?;
981
982                                    (task.ptg.clone(), task.ptt.clone(), task.weak_ticket.upgrade(), task.task_name.clone())
983                            };
984
985                            /*let ptg = task.ptg.clone();
986                            let ptt = task.ptt.clone();
987                            // check if ticket presents, if not create
988                            let ticket_arc_opt = task.weak_ticket.upgrade();
989                            let task_name = task.task_name.clone();*/
990
991                  
992
993                            // check that PeriodicTaskGuard is ok
994                            let Some(ptg_arc) = ptg.upgrade()
995                                else
996                                {
997                                    //let task_name = task.task_name.clone();
998
999                                    // if Arc cannot be upgraded, then remove the task as removed
1000                                    //task_token.tasks.remove(&timer_fd);
1001                                    let _ = task_token.remove_task(&task_name);
1002
1003                                    // continue event handling
1004                                    continue;
1005                                };
1006
1007                            // check timer result
1008                            let overflow_cnt: u64 = 
1009                                match timer_res
1010                                {
1011                                    TimerReadRes::Ok(overfl) => 
1012                                    {
1013                                        overfl
1014                                    },
1015                                    TimerReadRes::Cancelled => 
1016                                    {
1017                                        // the system time was modified, resched instance
1018
1019                                        self
1020                                            .global_task_injector
1021                                            .push(
1022                                                GlobalTasks::ReschedTask(ptg.clone(), ptt.clone(), None)
1023                                            );
1024
1025                                        // timer was reset continue
1026                                        continue;
1027                                    },
1028                                    TimerReadRes::WouldBlock => 
1029                                    {
1030                                        // should not happen, silently ignore or panic?
1031                                        panic!("assertion trap: timer retuned WouldBlock, {}", ptg_arc);
1032                                    }
1033                                };
1034
1035                            // check if ticket presents, if not create
1036                           // let ticket_arc_opt = task.weak_ticket.upgrade();
1037
1038
1039                            let ticket = 
1040                                match ticket_arc_opt
1041                                {
1042                                    Some(ticket) => 
1043                                        ticket,
1044                                    None => 
1045                                    {    
1046                                        let task_thread = 
1047                                            {
1048                                                /*let thread_local_hnd = 
1049                                                    task_token
1050                                                        .thread_pool
1051                                                        .get()
1052                                                        .unwrap()[self.thread_last_id]
1053                                                        .get_thread_handler();*/
1054
1055                                               
1056
1057                                                self.thread_last_id = (self.thread_last_id + 1) % thread_hndl_cnt;
1058
1059                                                //thread_local_hnd
1060                                                task_token.clone_thread_handler(self.thread_last_id)
1061                                            }; 
1062
1063                                        let ticket =
1064                                            {
1065                                                //let b_task = task.borrow();
1066                                                Arc::new(
1067                                                    NewTaskTicket::new(task_thread, ptg.clone())
1068                                                )
1069                                            };
1070
1071                                        let task = 
1072                                            task_token
1073                                                .get_task_by_fd(timer_fd)
1074                                                .map_err(|e|
1075                                                    map_timer_err!(e.get_error_type(), "thread: '{}', {}", self.thread_name, e.get_error_msg())
1076                                                )?;
1077                                                
1078                                        task.weak_ticket = Arc::downgrade(&ticket);
1079
1080                                        ticket
1081                                    }
1082                                };
1083                            
1084
1085                            
1086                            NewTaskTicket::send_task(ticket, overflow_cnt, thread_hndl_cnt);
1087                        },
1088                        _ => 
1089                        {
1090                            // ignore
1091                        },
1092                    }
1093                } // for
1094            }
1095            else if let Err(TryLockError::WouldBlock) = spti_lock_res
1096            {
1097                if self.thread_run_flag.load(Ordering::Acquire) == false
1098                {
1099                    return Ok(());
1100                }
1101
1102                if self.local_thread_inj.is_empty() == true
1103                {
1104                    std::thread::park_timeout(Duration::from_secs(2));
1105                }
1106            } 
1107            
1108        } // while
1109
1110        return Ok(());
1111    }
1112}
1113
1114
1115/// A common instance which contains the specific thread handler and
1116/// a [Weak] reference to the loop controlling flag.
1117#[derive(Debug)]
1118struct ThreadHandler
1119{
1120    /// A thread join handler
1121    hndl: JoinHandle<TimerResult<()>>,
1122
1123    /// A local task injector.
1124    task_injector: Arc<Injector<ThreadTask>>,
1125
1126    /// A flag which controls the thread loop. Initialized as `true`.
1127    thread_flag: Weak<AtomicBool>,
1128}
1129
1130impl ThreadHandler
1131{
1132    fn new(hndl: JoinHandle<TimerResult<()>>, task_injector: Arc<Injector<ThreadTask>>, thread_flag: Weak<AtomicBool>) -> Self
1133    {            
1134        return 
1135            Self
1136            {
1137                hndl,
1138                task_injector,
1139                thread_flag: thread_flag
1140            };
1141    }
1142
1143    fn stop(&self)
1144    {
1145        if let Some(v) = self.thread_flag.upgrade()
1146        {
1147            v.store(false, Ordering::Release);
1148        }
1149    }
1150
1151    fn unpark(&self) 
1152    {
1153        self.hndl.thread().unpark();
1154    }
1155
1156    fn send_task(&self, task: Arc<NewTaskTicket>, unpark: bool)
1157    {
1158        self.task_injector.push(ThreadTask::TaskExec(task));
1159
1160        if unpark == true
1161        {
1162            self.hndl.thread().unpark();
1163        }
1164
1165        return;
1166    }
1167
1168    fn clean_local_queue(&self)
1169    {
1170        while let Steal::Success(_) = self.task_injector.steal() {}
1171
1172        return;
1173    }
1174}
1175
1176/// A instance which contains all information about the threads allocated for the
1177/// task execution, tasks and timer polling. An instance is shared with all threads.
1178#[derive(Debug)]
1179pub struct SharedPeriodicTasks
1180{
1181    /// A list of the threads. A [OnceCell] is used to initialize the field once.
1182    thread_pool: OnceCell<Arc<Vec<Arc<ThreadHandler>>>>,
1183
1184    /// A [HashMap] of the regestered tasks. Each task is mapped to its timer FD.
1185    tasks_by_fd: HashMap<RawFd, PeriodicTaskTicket>,
1186
1187    tasks_name_to_fd: HashMap<String, RawFd>,
1188
1189    /// A timer event poll. 
1190    timers_poll: TimerPoll,
1191}
1192
1193
1194impl SharedPeriodicTasks
1195{ 
1196    fn new() -> TimerResult<Self>
1197    {
1198
1199        return Ok( 
1200            Self 
1201            { 
1202                thread_pool: OnceCell::default(),
1203                tasks_by_fd: HashMap::new(),
1204                tasks_name_to_fd: HashMap::new(),
1205                timers_poll: TimerPoll::new()?
1206            }
1207        );
1208    }
1209
1210    fn get_task_by_fd(&mut self, timer_fd: RawFd) -> TimerResult<&mut PeriodicTaskTicket>
1211    {
1212        return 
1213            self
1214                .tasks_by_fd
1215                .get_mut(&timer_fd)
1216                .ok_or_else(||
1217                    map_timer_err!(TimerErrorType::NotFound, "task fd: '{}' was found but task was not found", 
1218                        timer_fd.as_raw_fd())
1219                );
1220    }
1221
1222    fn get_task_by_name(&mut self, task_name: &str) -> TimerResult<&mut PeriodicTaskTicket>
1223    {
1224        let res = 
1225            self
1226                .tasks_name_to_fd
1227                .get(task_name)
1228                .ok_or_else(||
1229                    map_timer_err!(TimerErrorType::NotFound, "task: '{}' was not found", task_name)
1230                )
1231                .map(|v|
1232                    self
1233                        .tasks_by_fd
1234                        .get_mut(v)
1235                        .ok_or_else(||
1236                            map_timer_err!(TimerErrorType::NotFound, "task: '{}' fd: '{}' was found but task was not found", 
1237                                task_name, v)
1238                        )
1239                )??;
1240
1241        return Ok(res);
1242    }
1243
1244    fn clone_thread_handler(&self, thread_last_id: usize) -> Arc<ThreadHandler>
1245    {
1246        let thread_local_hnd = 
1247            self
1248                .thread_pool
1249                .get()
1250                .unwrap()[thread_last_id]
1251                .clone();
1252
1253        return thread_local_hnd;
1254    }
1255
1256    fn remove_task(&mut self, task_name: &str) -> TimerResult<PeriodicTaskTicket>
1257    {
1258        let Some(task_timer_fd) = self.tasks_name_to_fd.remove(task_name)
1259            else
1260            {
1261                timer_err!(TimerErrorType::NotFound, "task: '{}' was not found", task_name);
1262            };
1263
1264        let Some(ptt) = self.tasks_by_fd.remove(&task_timer_fd)
1265            else
1266            {
1267                timer_err!(TimerErrorType::NotFound, "task: '{}' fd: '{}' was found but task was not found", 
1268                    task_name, task_timer_fd);
1269            };
1270
1271        return Ok(ptt);
1272    }
1273
1274    fn contains_task(&self, task_name: &str) -> bool
1275    {
1276        return self.tasks_name_to_fd.contains_key(task_name);
1277    }
1278
1279    fn insert_task(&mut self, period_task_ticket: PeriodicTaskTicket)
1280    {
1281        let raw_fd = period_task_ticket.sync_timer.get_inner().as_raw_fd();
1282
1283        self.tasks_name_to_fd.insert(period_task_ticket.task_name.clone(), raw_fd);
1284        self.tasks_by_fd.insert(raw_fd, period_task_ticket);
1285
1286        return;
1287    }
1288}
1289
1290#[derive(Debug)]
1291pub struct SyncPeriodicTasksInner
1292{
1293    /// A [Weak] reference to the [EventFd] of the timer `poll` which is used to interrupt the polling.
1294    poll_int: PollInterrupt,
1295
1296    /// A `global` FIFO which is used to send commands to the task executor.
1297    task_injector: Arc<Injector<GlobalTasks>>,
1298}
1299
1300impl SyncPeriodicTasksInner
1301{
1302    fn send_global_cmd(&self, glob: GlobalTasks) -> TimerResult<()>
1303    {
1304        let poll_int = 
1305            self.poll_int.aquire()?;
1306                
1307        self.task_injector.push(glob);
1308
1309        poll_int.interrupt_drop()?;
1310
1311        return Ok(());
1312    }
1313
1314    fn clear_global_queue(&self)
1315    {
1316        while let Steal::Success(_) = self.task_injector.steal() {}
1317
1318        return;
1319    }
1320}
1321
1322/// A `main` instance which spawns the task executor.
1323/// 
1324/// ```ignore
1325/// let spt = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1326/// let task1 = TaskStruct1::new(2, send);
1327/// let task1_ptt = PeriodicTaskTime::Relative(TimerExpMode::<RelativeTime>::new_interval(RelativeTime::new_time(1, 0)));
1328/// let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1329/// ```
1330/// 
1331/// This instance should be kept somewhere and dropped only if the task executor WITH
1332/// all spawned tasks are no longer needed.
1333#[derive(Debug, Clone)]
1334pub struct SyncPeriodicTasks
1335{
1336    threads: Option<Arc<Vec<Arc<ThreadHandler>>>>,
1337
1338    inner: Arc<SyncPeriodicTasksInner>,
1339}
1340
1341impl Drop for SyncPeriodicTasks
1342{
1343    fn drop(&mut self) 
1344    {
1345        self.inner.clear_global_queue();
1346
1347        let mut threads = self.threads.take().unwrap();
1348
1349        // stop all threads
1350        for thread in threads.iter()
1351        {
1352            thread.stop();
1353            thread.unpark();
1354        }
1355
1356        // interrupt poll
1357        let _ = self.inner.poll_int.aquire().map(|v| v.interrupt_drop());
1358
1359        for _ in 0..5
1360        {
1361            let threads_unwr = 
1362                match Arc::try_unwrap(threads)
1363                {
1364                    Ok(r) => r,
1365                    Err(e) =>
1366                    {
1367                        threads = e;
1368
1369                        std::thread::sleep(Duration::from_millis(500));
1370
1371                        continue;
1372
1373                    }
1374                };
1375
1376            for thread in threads_unwr
1377            {
1378                thread.clean_local_queue();
1379
1380                let Some(thread) = Arc::into_inner(thread)
1381                else
1382                {
1383                    panic!("assertion trap: ~SyncPeriodicTasks, a reference to ThreadHandler left somewhere");
1384                };
1385
1386                let _ = thread.hndl.join();
1387            }
1388
1389            break;
1390        }
1391    }
1392}
1393
1394
1395impl SyncPeriodicTasks
1396{
1397
1398    
1399    /// Creates new instance. An amount of threads allocated for the task executor
1400    /// should be specified. All threads will be started immidiatly. For small
1401    /// tasks one thread will be enough. For a large amount of tasks, especially it
1402    /// tasks are waken up oftenly then at least two threads should be allocated.
1403    /// 
1404    /// # Arguments
1405    /// 
1406    /// * `threads_cnt` - a [NonZeroUsize] amount of threads.
1407    /// 
1408    /// # Returns
1409    /// 
1410    /// The [Result] is returned as alias [TimerResult].
1411    pub 
1412    fn new(threads_cnt: NonZeroUsize) -> TimerResult<Self>
1413    {
1414        let spti = SharedPeriodicTasks::new()?;
1415        let poll_int = spti.timers_poll.get_poll_interruptor();
1416
1417        // wrap into the mutex because this instance is shared 
1418        let spti = Arc::new(Mutex::new(spti));
1419
1420        let task_injector = Arc::new(Injector::<GlobalTasks>::new());
1421
1422        let mut thread_hndls: Vec<Arc<ThreadHandler>> = Vec::with_capacity(threads_cnt.get());
1423
1424        // spawn threads, all spawn threads will be parked by default.
1425        for i in 0..threads_cnt.get()
1426        {
1427            let handler = 
1428                ThreadWorker::new(format!("timer_exec/{}s", i), task_injector.clone(), spti.clone(), poll_int.clone())?;
1429
1430            thread_hndls.push(Arc::new(handler));
1431        }
1432
1433        let thread_hndls = Arc::new(thread_hndls);
1434
1435        // Lock the instance to initialze the `thread_pool` variable.
1436        let spti_lock = spti.lock().unwrap();
1437        spti_lock.thread_pool.get_or_init(|| thread_hndls.clone());
1438
1439        // get a random thread to unpark it and start the process
1440        let thread = 
1441            spti_lock
1442                .thread_pool
1443                .get()
1444                .unwrap()
1445                .get(random_range(0..threads_cnt.get()))
1446                .unwrap()
1447                .clone();
1448
1449        drop(spti_lock);
1450
1451        // unpark thread
1452        thread.unpark();
1453
1454        let inner = 
1455            SyncPeriodicTasksInner
1456            {
1457                poll_int: poll_int,
1458                task_injector: task_injector,
1459            };
1460
1461        return Ok( 
1462            Self 
1463            { 
1464                threads: Some(thread_hndls),
1465                inner: Arc::new(inner),
1466            }
1467        );
1468    }
1469
1470    /// Adds and spawns the task,
1471    /// 
1472    /// # Arguments
1473    /// 
1474    /// * `task_name` - a task name. used only for identification purposes in debug messages.
1475    /// 
1476    /// * `task` - a task which should be executed. It should implenet [PeriodicTask].
1477    /// 
1478    /// * `task_time` - [PeriodicTaskTime] a time when the task must be spawned.
1479    /// 
1480    /// # Returns
1481    /// 
1482    /// The [Result] is returned as alias [TimerResult].
1483    pub 
1484    fn add<T>(&self, task_name: impl Into<String>, task: T, task_time: PeriodicTaskTime) -> TimerResult<PeriodicTaskGuard>
1485    where T: PeriodicTask
1486    {
1487        let task_int: PeriodicTaskHndl = Box::new(task);
1488
1489        let task_name_str: String = task_name.into();
1490
1491        let period_task_guard = 
1492            Arc::new(PeriodicTaskGuardInner::new(task_name_str.clone(), task_int)?);
1493
1494
1495       /* let period_task_ticket = 
1496            PeriodicTaskTicket::new(task_name_str.clone(), task_time, Arc::downgrade(&period_task_guard));*/
1497
1498        let (mpsc_send, mpsc_recv) = mpsc::channel();
1499
1500        self.inner.send_global_cmd(GlobalTasks::AddTask(task_name_str.clone(), task_time, period_task_guard.clone(), Some(mpsc_send)) )?;
1501
1502        let _ = 
1503            mpsc_recv
1504                .recv()
1505                .map_err(|e|
1506                    map_timer_err!(TimerErrorType::ExternalError, "mpsc error: {}", e)
1507                )??;
1508
1509        let ret = 
1510            PeriodicTaskGuard
1511            {
1512                task_name: task_name_str,
1513                guard: Some(period_task_guard),
1514                spt: self.inner.clone()
1515            };
1516
1517        return Ok(ret);
1518    }
1519
1520    /// Checks if any thread have crashed and no longer works.
1521    /// 
1522    /// # Returns
1523    /// 
1524    /// A [Option] is retuerned with the inner data:
1525    /// 
1526    /// * [Option::Some] with the thread name [String] that have quit.
1527    /// 
1528    /// * [Option::None] indicating that everthing is fine.
1529    pub 
1530    fn check_thread_status(&self) -> Option<String>
1531    {
1532        for thread in self.threads.as_ref().unwrap().iter()
1533        {
1534            if let None = thread.thread_flag.upgrade()
1535            {
1536                return Some(thread.hndl.thread().name().unwrap().to_string());
1537            }
1538        }
1539
1540        return None;
1541    }
1542}
1543
1544#[cfg(test)]
1545mod tests
1546{
1547    use std::{sync::mpsc::{self, RecvTimeoutError, Sender}, time::{Duration, Instant}};
1548
1549    use crate::{periodic_task::sync_tasks::{PeriodicTask, PeriodicTaskResult, PeriodicTaskTime, SyncPeriodicTasks}, AbsoluteTime, RelativeTime};
1550
1551    #[derive(Debug)]
1552    struct TaskStruct1
1553    {
1554        a1: u64,
1555        s: Sender<u64>,
1556    }
1557
1558    impl TaskStruct1
1559    {
1560        fn new(a1: u64, s: Sender<u64>) -> Self
1561        {
1562            return Self{ a1: a1, s };
1563        }
1564    }
1565
1566    impl PeriodicTask for TaskStruct1
1567    {
1568        fn exec(&mut self) -> PeriodicTaskResult
1569        {
1570            println!("taskstruct1 val: {}", self.a1);
1571
1572            let _ = self.s.send(self.a1);
1573
1574            return PeriodicTaskResult::Ok;
1575        }
1576    }
1577
1578    #[derive(Debug)]
1579    struct TaskStruct2
1580    {
1581        a1: u64,
1582        s: Sender<u64>,
1583    }
1584
1585    impl TaskStruct2
1586    {
1587        fn new(a1: u64, s: Sender<u64>) -> Self
1588        {
1589            return Self{ a1: a1, s };
1590        }
1591    }
1592
1593    impl PeriodicTask for TaskStruct2
1594    {
1595        fn exec(&mut self) -> PeriodicTaskResult
1596        {
1597            println!("taskstruct2 val: {}", self.a1);
1598
1599            self.s.send(self.a1).unwrap();
1600
1601            return PeriodicTaskResult::TaskReSchedule(PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(2, 0)));
1602        }
1603    }
1604
1605    #[test]
1606    fn test1_absolute_simple()
1607    {
1608        let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1609
1610        let (send, recv) = mpsc::channel::<u64>();
1611
1612        let task1 = TaskStruct1::new(2, send);
1613        let task1_ptt = PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1614        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1615
1616        println!("added");
1617        let val = recv.recv();
1618
1619        println!("{:?}", val);
1620
1621        drop(task1_guard);
1622    }
1623
1624    #[test]
1625    fn test1_relative_simple()
1626    {
1627        let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1628
1629        let (send, recv) = mpsc::channel::<u64>();
1630
1631        let task1 = TaskStruct1::new(2, send);
1632        let task1_ptt = PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1633        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1634        
1635        let mut s = Instant::now();
1636
1637        for i in 0..3
1638        {
1639            let val = recv.recv().unwrap();
1640
1641            let e = s.elapsed();
1642            s = Instant::now();
1643
1644            println!("{}: {:?} {:?} {}", i, val, e, e.as_micros());
1645            
1646            assert!(999000 < e.as_micros() && e.as_micros() < 10001200);
1647            assert_eq!(val, 2);
1648        }
1649
1650        drop(task1_guard);
1651
1652        std::thread::sleep(Duration::from_millis(100));
1653
1654        return;
1655    }
1656    
1657    #[test]
1658    fn test1_relative_resched_to_abs()
1659    {
1660        let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1661
1662        let (send, recv) = mpsc::channel::<u64>();
1663
1664        let task1 = TaskStruct2::new(2, send);
1665        let task1_ptt = PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1666        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1667
1668        let s = Instant::now();
1669        match recv.recv_timeout(Duration::from_millis(1150))
1670        {
1671            Ok(rcv_a) => 
1672            {
1673                let e = s.elapsed();
1674                println!("{:?} {}", e, e.as_micros());
1675                assert_eq!(rcv_a, 2);
1676                assert!(999051 < e.as_micros() && e.as_micros() < 1000551);
1677            },
1678            Err(RecvTimeoutError::Timeout) => 
1679                panic!("tineout"),
1680            Err(e) =>
1681                panic!("{}", e),
1682        }
1683
1684        let s = Instant::now();
1685        match recv.recv_timeout(Duration::from_millis(2100))
1686        {
1687            Ok(rcv_a) => 
1688            {
1689                let e = s.elapsed();
1690                println!("{:?} {}", e, e.as_micros());
1691                assert_eq!(rcv_a, 2);
1692                assert!(1999642 < e.as_micros() && e.as_micros() < 2000342);
1693            },
1694            Err(RecvTimeoutError::Timeout) => 
1695                panic!("tineout"),
1696            Err(e) =>
1697                panic!("{}", e),
1698        }
1699
1700        let s = Instant::now();
1701        match recv.recv_timeout(Duration::from_millis(2100))
1702        {
1703            Ok(rcv_a) => 
1704            {
1705                let e = s.elapsed();
1706                println!("{:?} {}", e, e.as_micros());
1707                assert_eq!(rcv_a, 2);
1708                assert!(1999642 < e.as_micros() && e.as_micros() < 2000342);
1709            },
1710            Err(RecvTimeoutError::Timeout) => 
1711                panic!("tineout"),
1712            Err(e) =>
1713                panic!("{}", e),
1714        }
1715
1716        drop(task1_guard);
1717
1718        std::thread::sleep(Duration::from_millis(100));
1719
1720        return;
1721    }
1722
1723    #[test]
1724    fn test1_relative_simple_resched()
1725    {
1726        let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1727
1728        let (send, recv) = mpsc::channel::<u64>();
1729
1730        let task1 = TaskStruct1::new(2, send);
1731        let task1_ptt = 
1732            PeriodicTaskTime::interval(
1733                RelativeTime::new_time(1, 0)
1734            );
1735        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1736        
1737        let mut s = Instant::now();
1738
1739        for i in 0..3
1740        {
1741            let val = recv.recv().unwrap();
1742
1743            let e = s.elapsed();
1744            s = Instant::now();
1745
1746            println!("{}: {:?} {:?} {}", i, val, e, e.as_micros());
1747            
1748            assert!(999000 < e.as_micros() && e.as_micros() < 10001200);
1749            assert_eq!(val, 2);
1750        }
1751
1752        task1_guard
1753            .reschedule_task(
1754                PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(2, 0))
1755            )
1756            .unwrap();
1757
1758        s = Instant::now();
1759        let val = recv.recv().unwrap();
1760        let e = s.elapsed();
1761
1762        println!("resched: {:?} {:?} {}", val, e, e.as_micros());
1763
1764        assert!(1999000 < e.as_micros() && e.as_micros() < 2000560);
1765
1766        let val = recv.recv_timeout(Duration::from_secs(3));
1767
1768        assert_eq!(val.is_err(), true);
1769        assert_eq!(val.err().unwrap(), RecvTimeoutError::Timeout);
1770
1771        drop(task1_guard);
1772
1773        std::thread::sleep(Duration::from_millis(100));
1774
1775        return;
1776    }
1777
1778    #[test]
1779    fn test1_relative_simple_cancel()
1780    {
1781        let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1782
1783        let (send, recv) = mpsc::channel::<u64>();
1784
1785        let task1 = TaskStruct1::new(0, send.clone());
1786        let task1_ptt = 
1787            PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1788
1789        let task2 = TaskStruct1::new(1, send.clone());
1790        let task2_ptt = 
1791            PeriodicTaskTime::interval(RelativeTime::new_time(2, 0));
1792
1793        let task3 = TaskStruct1::new(2, send);
1794        let task3_ptt = 
1795            PeriodicTaskTime::interval(RelativeTime::new_time(0, 500_000_000));
1796
1797        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1798        let task2_guard = s.add("task2", task2, task2_ptt).unwrap();
1799        let task3_guard = s.add("task3", task3, task3_ptt).unwrap();
1800
1801        let mut a_cnt: [u8; 3] = [0_u8; 3];
1802
1803        let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
1804
1805        while AbsoluteTime::now() < end
1806        {
1807            match recv.recv_timeout(Duration::from_millis(1))
1808            {
1809                Ok(rcv_a) => 
1810                    a_cnt[rcv_a as usize] += 1,
1811                Err(RecvTimeoutError::Timeout) => 
1812                    continue,
1813                Err(e) =>
1814                    panic!("{}", e),
1815            }
1816
1817            
1818        }
1819
1820        assert_eq!(a_cnt[0], 5);
1821        assert_eq!(a_cnt[1], 2);
1822        assert_eq!(a_cnt[2], 10);
1823
1824        // drop task #3
1825        task3_guard.suspend_task().unwrap();
1826
1827
1828        let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
1829
1830        while AbsoluteTime::now() < end
1831        {
1832            match recv.recv_timeout(Duration::from_millis(1))
1833            {
1834                Ok(rcv_a) => 
1835                    a_cnt[rcv_a as usize] += 1,
1836                Err(RecvTimeoutError::Timeout) => 
1837                    continue,
1838                Err(e) =>
1839                    panic!("{}", e),
1840            }
1841
1842            
1843        }
1844
1845        assert_eq!(a_cnt[0] > 5, true);
1846        assert_eq!(a_cnt[1] > 2, true);
1847        assert!((a_cnt[2] == 10 || a_cnt[2] == 11));
1848
1849        drop(task1_guard);
1850        drop(task2_guard);
1851        drop(task3_guard);
1852
1853        let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
1854
1855        while AbsoluteTime::now() < end
1856        {
1857            match recv.recv_timeout(Duration::from_millis(1))
1858            {
1859                Ok(rcv_a) => 
1860                    a_cnt[rcv_a as usize] += 1,
1861                Err(RecvTimeoutError::Timeout) => 
1862                    continue,
1863                Err(_) =>
1864                    break,
1865            }
1866
1867            
1868        }
1869
1870        assert_eq!(AbsoluteTime::now() < end, true);
1871
1872        
1873
1874        return;
1875    }
1876
1877    // freebsd text failed with  src/periodic_task/sync_tasks.rs:1784:9 left: 6
1878    #[test]
1879    fn test2_multithread_1()
1880    {
1881        let s = SyncPeriodicTasks::new(2.try_into().unwrap()).unwrap();
1882
1883        let (send, recv) = mpsc::channel::<u64>();
1884
1885        let task1 = TaskStruct1::new(0, send.clone());
1886        let task1_ptt = 
1887            PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1888
1889        let task2 = TaskStruct1::new(1, send.clone());
1890        let task2_ptt = 
1891            PeriodicTaskTime::interval(RelativeTime::new_time(2, 0));
1892
1893        let task3 = TaskStruct1::new(2, send.clone());
1894        let task3_ptt = 
1895            PeriodicTaskTime::interval(RelativeTime::new_time(0, 500_000_000));
1896
1897        let task4 = TaskStruct1::new(3, send.clone());
1898        let task4_ptt = 
1899            PeriodicTaskTime::interval(RelativeTime::new_time(0, 200_000_000));
1900
1901        let task5 = TaskStruct1::new(4, send.clone());
1902        let task5_ptt = 
1903            PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(5, 0));
1904
1905        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1906        let task2_guard = s.add("task2", task2, task2_ptt).unwrap();
1907        let task3_guard = s.add("task3", task3, task3_ptt).unwrap();
1908        let task4_guard = s.add("task4", task4, task4_ptt).unwrap();
1909        let task5_guard = s.add("task5", task5, task5_ptt).unwrap();
1910
1911
1912        let mut a_cnt: [u8; 5] = [0_u8; 5];
1913
1914        let end = AbsoluteTime::now() + RelativeTime::new_time(5, 500_000_000);
1915
1916        while AbsoluteTime::now() < end
1917        {
1918            match recv.recv_timeout(Duration::from_millis(1))
1919            {
1920                Ok(rcv_a) => 
1921                    a_cnt[rcv_a as usize] += 1,
1922                Err(RecvTimeoutError::Timeout) => 
1923                    continue,
1924                Err(e) =>
1925                    panic!("{}", e),
1926            }
1927
1928            
1929        }
1930
1931        println!("{:?}", a_cnt);
1932
1933        assert!(a_cnt[0] == 5);
1934        assert!(a_cnt[1] == 2);
1935        assert!((a_cnt[2] == 10 || a_cnt[2] == 11));
1936        assert!(a_cnt[3] == 27);
1937        assert!(a_cnt[4] == 1);
1938
1939        task5_guard.reschedule_task(PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(0, 500_000_000))).unwrap();
1940
1941        let end = AbsoluteTime::now() + RelativeTime::new_time(0, 600_000_000);
1942
1943        while AbsoluteTime::now() < end
1944        {
1945            match recv.recv_timeout(Duration::from_millis(1))
1946            {
1947                Ok(rcv_a) => 
1948                    a_cnt[rcv_a as usize] += 1,
1949                Err(RecvTimeoutError::Timeout) => 
1950                    continue,
1951                Err(e) =>
1952                    panic!("{}", e),
1953            }
1954        }
1955
1956        println!("{:?}", a_cnt);
1957        assert!(a_cnt[4] == 2);
1958
1959        drop(task5_guard);
1960        drop(task4_guard);
1961        drop(task3_guard);
1962        drop(task2_guard);
1963
1964        let end = AbsoluteTime::now() + RelativeTime::new_time(2, 1000);
1965
1966        while AbsoluteTime::now() < end
1967        {
1968            match recv.recv_timeout(Duration::from_millis(1))
1969            {
1970                Ok(rcv_a) => 
1971                    a_cnt[rcv_a as usize] += 1,
1972                Err(RecvTimeoutError::Timeout) => 
1973                    continue,
1974                Err(e) =>
1975                    panic!("{}", e),
1976            }
1977        }
1978
1979
1980        println!("{:?}", a_cnt);
1981        assert_eq!(a_cnt[4], 2);
1982        assert_eq!(a_cnt[0], 8);
1983
1984        drop(task1_guard);
1985
1986        std::thread::sleep(Duration::from_millis(10));
1987        return;
1988    }
1989}