timer_deque_rs/periodic_task/
sync_tasks.rs

1/*-
2 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
6 *  4neko.org alex@4neko.org
7 * 
8 * The timer-rs crate can be redistributed and/or modified
9 * under the terms of either of the following licenses:
10 *
11 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
12 *                     
13 *   2. The MIT License (MIT)
14 *                     
15 *   3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
16 */
17
18use std::
19{
20    cell::OnceCell, 
21    cmp, 
22    collections::HashMap, 
23    fmt, 
24    num::NonZeroUsize, 
25    os::fd::{AsRawFd, RawFd}, 
26    sync::
27    {
28        Arc, Mutex, TryLockError, Weak, atomic::{AtomicBool, Ordering}, mpsc::{self}
29    }, 
30    thread::JoinHandle, 
31    time::Duration
32};
33
34use crossbeam_deque::{Injector, Steal};
35use rand::random_range;
36
37use crate::
38{
39    AbsoluteTime, 
40    FdTimerCom, 
41    RelativeTime, 
42    TimerPoll, 
43    TimerReadRes, 
44    error::{TimerError, TimerErrorType, TimerResult}, 
45    map_timer_err, timer_err, 
46    timer_portable::
47    {
48        PollEventType, PolledTimerFd, TimerExpMode, TimerFlags, TimerType, poll::PollInterrupt, timer::TimerFd
49    }
50};
51
52/// A result of the task execution which is returned by the user task.
53/// See description for the each variant.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum PeriodicTaskResult
56{
57    /// The task exec was ok. No errors.
58    Ok,
59
60    /// Cancels (but not removing) the task due to error or for other reason.
61    /// By returning this result the timer is unset to stop generating events. 
62    /// But, if since the `AbortTask` was received and a timer has timed out again 
63    /// the event will be called again. It is not necessary to return the `AbortTask` again, 
64    /// just return [PeriodicTaskResult::Ok] exiting imidiatly.
65    CancelTask,
66
67    /// A request to reschedule the task to a new time or interval. The `0` argument 
68    /// should contain new time. The timer mode can be changed.
69    TaskReSchedule(PeriodicTaskTime)
70}
71
72/// A trait which should be implemented by the instance which will be added as a `task`.
73/// 
74/// * the `instance` must implement [Send] because it may be moved to other thread.
75/// 
76/// * it is not necessary that the `instance` to impl [Sync] because the `instance` will never
77/// be executed from two different threads and it is already protected by mutex.
78/// 
79/// A `task` should never block the thread i.e call some functions which may block for a large
80/// period of time! 
81pub trait PeriodicTask: Send + 'static
82{
83    /// A task entry point. The task should return a [PeriodicTaskResult].
84    fn exec(&mut self) -> PeriodicTaskResult;
85}
86
87/// An alias for the boxed value.
88pub type PeriodicTaskHndl = Box<dyn PeriodicTask>;
89
90
91/// A global FIFO commands.
92/// 
93/// For all variants, the option [Option] [mpsc::Sender] is used for the error feedback and if
94/// it is set to [Option::None] the error will not be reported.
95#[derive(Debug)]
96pub(crate) enum GlobalTasks
97{
98    /// Adds task to the task system. If error happens it will be reported over the `1` and item `0`
99    /// will not be added to the instance.
100    AddTask( String, PeriodicTaskTime, Arc<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
101
102    /// Removes the task from the system.
103    RemoveTask( Arc<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
104
105    /// Changes the timer value.
106    ReschedTask( Weak<PeriodicTaskGuardInner>, PeriodicTaskTime, Option<mpsc::Sender<TimerResult<()>>> ),
107
108    /// Suspends the task but does not remove the task. The task which is planned to suspend, if it is already
109    /// in the exec queue, will be executed.
110    SuspendTask( Weak<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
111
112    /// Resumes the task by enabling timer.
113    ResumeTask( Weak<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
114}
115
116/// A local task queue commands.
117#[derive(Debug)]
118pub enum ThreadTask
119{
120    /// Execute the task.
121    TaskExec( Arc<NewTaskTicket> )
122}
123
124/// A ticket which is assigned to specific task in order to send the same task to the same
125/// thread which has been already assigned. If only one thread is allocated, then this is not
126/// really necessary (will be optimized in future).
127#[derive(Debug)]
128pub struct NewTaskTicket
129{
130    /// A thread handler of the specific thread.
131    task_thread: Arc<ThreadHandler>,
132
133    /// A task. A weak reference is used in order to determine if task was
134    /// removed and the exec is not necessary.
135    ptgi: Weak<PeriodicTaskGuardInner>,
136}
137
138impl NewTaskTicket
139{
140    fn new(task_thread: Arc<ThreadHandler>, ptgi: Weak<PeriodicTaskGuardInner>) -> Self
141    {
142        return 
143            Self
144            {
145                task_thread: 
146                    task_thread,
147                ptgi:
148                    ptgi
149            };
150    }
151
152    /// Sends the task on exec to the local queue of the thread.
153    fn send_task(this: Arc<NewTaskTicket>, task_rep_count: u64, thread_hndl_cnt: usize)
154    {
155        let strong_cnt = Arc::strong_count(&this);
156        let thread = this.task_thread.clone();
157
158        for _ in 0..task_rep_count
159        {
160            thread.send_task(this.clone(), strong_cnt < 2 && thread_hndl_cnt > 1);
161        }
162    }
163}
164
165/// The internal structure which is stored in pair with the timer's FD. Contains necessary references
166/// and values.
167#[derive(Debug)]
168pub(crate) struct PeriodicTaskTicket
169{
170    /// A task name.
171    task_name: String,
172
173    sync_timer: PolledTimerFd<TimerFd>,
174
175    /// A time which was set to timer. Needed in case if timer was cancelled ny OS or suspended.
176    ptt: PeriodicTaskTime,
177
178    /// A [Weak] reference to the current [NewTaskTicket] which identifies to which thread the
179    /// task exec was assigned. The [Arc] of [NewTaskTicket] is clonned for each task exec round.
180    /// If is not `upgradable` the task is no logner assigned to thread.
181    weak_ticket: Weak<NewTaskTicket>,
182
183    /// A [Weak] reference to the [PeriodicTaskGuardInner]. If this reference is not `upgradable` 
184    /// then the task is no logner valid.
185    ptg: Weak<PeriodicTaskGuardInner>,
186}
187
188impl Ord for PeriodicTaskTicket 
189{
190    fn cmp(&self, other: &Self) -> cmp::Ordering 
191    {
192        return 
193            self
194                .sync_timer
195                .get_inner()
196                .as_raw_fd()
197                .cmp(&other.sync_timer.get_inner().as_raw_fd());
198    }
199}
200
201impl PartialOrd for PeriodicTaskTicket 
202{
203    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> 
204    {
205        return Some(self.cmp(other));
206    }
207}
208
209impl PartialEq for PeriodicTaskTicket 
210{
211    fn eq(&self, other: &Self) -> bool 
212    {
213        return 
214            self.task_name == other.task_name && 
215            self.sync_timer.get_inner().as_raw_fd() == other.sync_timer.get_inner().as_raw_fd();
216    }
217}
218
219impl Eq for PeriodicTaskTicket {}
220
221impl PeriodicTaskTicket
222{
223    fn new(task_name: String, ptt: PeriodicTaskTime, ptg: Arc<PeriodicTaskGuardInner>, poll: &TimerPoll) -> TimerResult<Self>
224    {
225        // create timer and add to poll
226        let sync_timer = 
227            TimerFd::new(task_name.clone().into(), TimerType::CLOCK_REALTIME, 
228                TimerFlags::TFD_CLOEXEC | TimerFlags::TFD_NONBLOCK)
229                .map_err(|e|
230                    map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot setup timer for task: '{}'", task_name)
231                )
232                .and_then(|timer| 
233                    poll.add(timer)
234                )?;
235
236        let ptt = 
237            Self
238            {
239                task_name: task_name,
240                sync_timer: sync_timer,
241                ptt: ptt,
242                weak_ticket: Weak::new(),
243                ptg: Arc::downgrade(&ptg),
244            };
245
246        ptt.set_timer()?;
247
248        return Ok(ptt);
249    }
250
251    #[inline]
252    fn set_timer(&self) -> TimerResult<()>
253    {
254        return self.get_timer_time().set_timer(self.sync_timer.get_inner());
255    }
256
257    #[inline]
258    fn unset_timer(&self) -> TimerResult<()>
259    {
260        return 
261            self
262                .sync_timer
263                .get_inner()
264                .get_timer()
265                .unset_time()
266                .map_err(|e|
267                    map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "unsetting timer '{}' returned error: {}", self.sync_timer, e)
268                );
269    }
270
271    fn update_task_time(&mut self, ptt_new: PeriodicTaskTime) -> TimerResult<()>
272    {
273        self.ptt = ptt_new;
274
275        return self.set_timer();
276    } 
277
278    fn get_task_guard(&self) -> TimerResult<Arc<PeriodicTaskGuardInner>>
279    {
280        return 
281            self
282                .ptg
283                .upgrade()
284                .ok_or_else(||
285                    map_timer_err!(TimerErrorType::ReferenceGone, "task: '{}' reference to timer has gone", 
286                        self.task_name)
287                );
288    }
289
290    #[inline]
291    fn get_timer_time(&self) -> &PeriodicTaskTime
292    {
293        return &self.ptt;
294    }
295}
296
297/// An instance which is returned by the [SyncPeriodicTasks::add] whuch guards the 
298/// task and allows to control its state. If task is no longer needed the instance
299/// can be dropped and it will be removed from the system.
300#[derive(Debug)]
301pub struct PeriodicTaskGuard
302{
303    /// A task name
304    task_name: String,
305
306    /// A [Arc] to [PeriodicTaskGuardInner] which contains timer and other things. 
307    /// Normally this is the base instance i.e reference and if it is dropped
308    /// it indicates to the task `executor` that the instance is no logner valid.
309    /// The autoremoval is perfomed via sending the `command` over the global
310    /// qeueu.
311    /// 
312    /// The [Option] is needed to `take` the instance.
313    guard: Option<Arc<PeriodicTaskGuardInner>>,
314
315    /// A clonned instance to the executor spawner which contains the reference 
316    /// to channel.
317    spt: Arc<SyncPeriodicTasksInner>
318}
319
320impl Drop for PeriodicTaskGuard
321{
322    fn drop(&mut self) 
323    {
324        let guard = self.guard.take().unwrap();
325
326        let _ = self.spt.send_global_cmd(GlobalTasks::RemoveTask(guard, None));
327
328        return;
329    }
330}
331
332impl PeriodicTaskGuard
333{
334    /// Requests the task rescheduling - changine the timer time or mode. 
335    /// The `task` is represented by the calling instance. 
336    /// 
337    /// This function blocks
338    /// the current thread for the maximum (in worst case) 5 seconds which is a 
339    /// timeout for feedback reception.
340    /// 
341    /// # Arguments
342    /// 
343    /// * `ptt` - a new time to be set to timer.
344    /// 
345    /// # Returns 
346    /// 
347    /// A [Result] as alias [TimerResult] is returned.
348    
349    /// In case if error is retuned, the operation should be considered as not completed
350    /// correctly and the executor instance is poisoned i.e a bug happened.
351    /// 
352    /// The common errors may be retuned:
353    /// 
354    /// * [TimerErrorType::MpscTimeout] - a feedback (with the result) reception timeout.
355    ///     Probably this is because the task executor is too busy.
356    /// 
357    /// * [TimerErrorType::TimerError] - with the timer error.
358    /// 
359    /// * [TimerErrorType::NotFound] - if instance was not found in the internal records. 
360    ///     Should not happen. If appears, then probably this is a bug.
361    pub 
362    fn reschedule_task(&self, ptt: PeriodicTaskTime) -> TimerResult<()>
363    {
364        let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
365
366        let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
367
368        self.spt.send_global_cmd(GlobalTasks::ReschedTask(weak_ptgi, ptt, Some(snd)))?;
369
370        return 
371            rcv
372                .recv_timeout(Duration::from_secs(10))
373                .map_err(|e|
374                    map_timer_err!(TimerErrorType::MpscTimeout, "reschedule_task(), task name: '{}', MPSC rcv timeout error: '{}'", 
375                        self.task_name, e)
376                )?;
377    }
378
379    /// Requests to suspent the current `task`.
380    /// 
381    /// This function blocks
382    /// the current thread for the maximum (in worst case) 5 seconds which is a 
383    /// timeout for feedback reception.
384    /// 
385    /// # Returns 
386    /// 
387    /// A [Result] as alias [TimerResult] is returned.
388    /// 
389    /// In case if error is retuned, the operation should be considered as not completed
390    /// correctly and the executor instance is poisoned i.e a bug happened.
391    /// 
392    /// The common errors may be retuned:
393    /// 
394    /// * [TimerErrorType::MpscTimeout] - a feedback (with the result) reception timeout.
395    ///     Probably this is because the task executor is too busy.
396    /// 
397    /// * [TimerErrorType::TimerError] - with the timer error.
398    pub 
399    fn suspend_task(&self) -> TimerResult<()>
400    {
401        let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
402
403        let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
404
405        self.spt.send_global_cmd(GlobalTasks::SuspendTask(weak_ptgi, Some(snd)))?;
406
407        return 
408            rcv
409                .recv_timeout(Duration::from_secs(10))
410                .map_err(|e|
411                    map_timer_err!(TimerErrorType::MpscTimeout, "suspend_task(), task name: '{}', MPSC rcv timeout error: '{}'", 
412                        self.task_name, e)
413                )?;
414    }
415
416    /// Requests to resume the task from the suspend state.
417    /// 
418    /// This function blocks
419    /// the current thread for the maximum (in worst case) 5 seconds which is a 
420    /// timeout for feedback reception.
421    /// 
422    /// # Returns 
423    /// 
424    /// A [Result] as alias [TimerResult] is returned.
425    /// 
426    /// In case if error is retuned, the operation should be considered as not completed
427    /// correctly and the executor instance is poisoned i.e a bug happened.
428    /// 
429    /// The common errors may be retuned:
430    /// 
431    /// * [TimerErrorType::MpscTimeout] - a feedback (with the result) reception timeout.
432    ///     Probably this is because the task executor is too busy.
433    /// 
434    /// * [TimerErrorType::TimerError] - with the timer error.
435    pub 
436    fn resume_task(&self) -> TimerResult<()>
437    {
438        let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
439
440        let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
441
442        self.spt.send_global_cmd(GlobalTasks::ResumeTask(weak_ptgi, Some(snd)))?;
443
444        return 
445            rcv
446                .recv_timeout(Duration::from_secs(10))
447                .map_err(|e|
448                    map_timer_err!(TimerErrorType::MpscTimeout, "resume_task(), task name: '{}', MPSC rcv timeout error: '{}'", 
449                        self.task_name, e)
450                )?;
451    }
452}
453
454/// Programs the taks's timer to specific time and mode. This instance is
455/// a wrapper around the [TimerExpMode] as this struct requers the generic to
456/// be specified which defines the type of the time.
457#[derive(Debug, Clone, Copy, PartialEq, Eq)]
458pub enum PeriodicTaskTime
459{
460    /// A provided time is absolute time in future.
461    Absolute(TimerExpMode<AbsoluteTime>),
462
463    /// A provided time is relative to current time.
464    Relative(TimerExpMode<RelativeTime>),
465}
466
467impl From<TimerExpMode<AbsoluteTime>> for PeriodicTaskTime
468{
469    fn from(value: TimerExpMode<AbsoluteTime>) -> Self 
470    {
471        return Self::Absolute(value);
472    }
473}
474
475impl From<TimerExpMode<RelativeTime>> for PeriodicTaskTime
476{
477    fn from(value: TimerExpMode<RelativeTime>) -> Self 
478    {
479        return Self::Relative(value);
480    }
481}
482
483impl fmt::Display for PeriodicTaskTime
484{
485    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
486    {
487        match self
488        {
489            Self::Absolute(t) => 
490                write!(f, "{}", t),
491            Self::Relative(t) => 
492                write!(f, "{}", t),
493        }
494    }
495}
496
497impl PeriodicTaskTime
498{
499    /// The timer is set to time up once and for the specific `absolute` time in future 
500    /// (to current realclock time).
501    #[inline]
502    pub 
503    fn exact_time(abs_time: AbsoluteTime) -> Self
504    {
505        return Self::Absolute(TimerExpMode::<AbsoluteTime>::new_oneshot(abs_time));
506    }
507
508    /// The timer is set to time up in the interval for the `relative` time.
509    #[inline]
510    pub 
511    fn interval(rel_time: RelativeTime) -> Self
512    {
513        return Self::Relative(TimerExpMode::<RelativeTime>::new_interval(rel_time));
514    }
515
516    /// The timer is set to time up in the interval for the `relative` time with the initial
517    /// delay.
518    /// 
519    /// # Arguments
520    /// 
521    /// * `start_del_time` - a [RelativeTime] of the initial delay.
522    /// 
523    /// * `rel_int_time` - a [RelativeTime] of the interval.
524    #[inline]
525    pub 
526    fn interval_with_start_delay(start_del_time: RelativeTime, rel_int_time: RelativeTime) -> Self
527    {
528        return Self::Relative(TimerExpMode::<RelativeTime>::new_interval_with_init_delay(start_del_time, rel_int_time));
529    }
530
531    /// Sets the `timer_fd` instance to the specific value stored in the current instance.
532    fn set_timer(&self, timer_fd: &TimerFd) -> TimerResult<()>
533    {
534        match *self
535        {
536            Self::Absolute(timer_exp_mode) => 
537                return 
538                    timer_fd
539                        .get_timer()
540                        .set_time(timer_exp_mode)
541                        .map_err(|e|
542                            map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot set time '{}' for timer: '{}'", timer_exp_mode, timer_fd )
543                        ),
544            Self::Relative(timer_exp_mode) => 
545                return 
546                    timer_fd
547                        .get_timer()
548                        .set_time(timer_exp_mode)
549                        .map_err(|e|
550                            map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot set time '{}' for timer: '{}'", timer_exp_mode, timer_fd )
551                        ),
552        }
553    }
554}
555
556
557/// A instane which contains the task's timer and a task.
558pub(crate) struct PeriodicTaskGuardInner
559{
560    task_name: String,
561
562    /// A task itself. At the moment it is mutexed in order to provide
563    /// the mutability, because the current instance is wrapped into [Arc].
564    /// But, the task is never executed from different threads and mutex
565    /// should be replaced with something that is [Send] and can provide
566    /// mutable reference.
567    task: Mutex<PeriodicTaskHndl>,
568}
569
570impl fmt::Debug for PeriodicTaskGuardInner
571{
572    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
573    {
574        f.debug_struct("PeriodicTaskGuardInner").field("task_name", &self.task_name).finish()
575    }
576}
577
578impl fmt::Display for PeriodicTaskGuardInner
579{
580    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
581    {
582        write!(f, "task name: '{}'", self.task_name)
583    }
584}
585
586impl PeriodicTaskGuardInner
587{
588    fn new(task_name: String, task_inst: PeriodicTaskHndl) -> TimerResult<Self>
589    {
590        return Ok(Self {  task_name: task_name, task: Mutex::new(task_inst) });
591    }
592
593
594}
595
596/// Internal structure for every worker thread.
597struct ThreadWorker
598{
599    /// A thread name, not task name.
600    thread_name: String,
601
602    /// An [Injector] queue for the global tasks received from other threads.
603    global_task_injector: Arc<Injector<GlobalTasks>>,
604
605    /// An [Injector] queue for local (specific to current thread) tasks.
606    local_thread_inj: Arc<Injector<ThreadTask>>,
607
608    /// A flag which is used to control the thread's loop. Is set to `false`
609    /// when thread should exit.
610    thread_run_flag: Arc<AtomicBool>,
611
612    /// A shared instance of the [SyncPeriodicTasksInner] which holds all information
613    /// about tasks and FD polling. The thread which aquired the mutex first will
614    /// process the `global_task_injector` queue and poll the timers for the events which
615    /// will be shared over with other threads.
616    spti: Arc<Mutex<SharedPeriodicTasks>>,
617
618    /// A last thread to which the task was assigned.
619    thread_last_id: usize,
620
621    /// A poll interrupt signal sender.
622    poll_int: PollInterrupt
623}
624
625impl ThreadWorker
626{
627    fn new(thread_name: String, global_task_injector: Arc<Injector<GlobalTasks>>, 
628        spti: Arc<Mutex<SharedPeriodicTasks>>, poll_int: PollInterrupt) -> TimerResult<ThreadHandler>
629    {
630        let local_thread_inj = Arc::new(Injector::<ThreadTask>::new());
631        let thread_run_flag = Arc::new(AtomicBool::new(true));
632        let thread_run_flag_weak = Arc::downgrade(&thread_run_flag);
633
634        let worker = 
635            ThreadWorker
636            {
637                thread_name: 
638                    thread_name.clone(),
639                global_task_injector: 
640                    global_task_injector,
641                local_thread_inj:
642                    local_thread_inj.clone(),
643                thread_run_flag: 
644                    thread_run_flag,
645                spti:
646                    spti,
647                thread_last_id:
648                    0,
649                poll_int:
650                    poll_int
651            };
652
653        let thread_hndl =
654            std::thread::Builder::new()
655                .name(thread_name)
656                .spawn(|| worker.worker())
657                .map_err(|e|
658                    map_timer_err!(TimerErrorType::SpawnError(e.kind()), "{}", e)
659                )?;
660            
661
662        return Ok( ThreadHandler::new(thread_hndl, local_thread_inj, thread_run_flag_weak) );
663    }
664
665
666    fn worker(mut self) -> TimerResult<()>
667    {
668        // force to park to allow the main thread to initialize everything
669        std::thread::park();
670
671        while self.thread_run_flag.load(Ordering::Acquire) == true
672        {
673            // check local queue for the task or token
674            while let Steal::Success(task) = self.local_thread_inj.steal()
675            {
676                match task
677                {
678                    ThreadTask::TaskExec(task_exec) =>
679                    {
680                        let Some(ptgi) = task_exec.ptgi.upgrade()
681                            else
682                            {   
683                                // task was removed while was in queue.
684                                continue;
685                            };
686
687                        // call task
688                        match ptgi.task.lock().unwrap().exec()
689                        {
690                            PeriodicTaskResult::Ok => 
691                                {},
692                            PeriodicTaskResult::CancelTask => 
693                            {
694                                self
695                                    .global_task_injector
696                                    .push(GlobalTasks::SuspendTask(task_exec.ptgi.clone(), None));
697
698                                let _ = self.poll_int.aquire().map(|v| v.interrupt_drop());
699                            },
700                            PeriodicTaskResult::TaskReSchedule(ptt) => 
701                            {
702                                self
703                                    .global_task_injector
704                                    .push(GlobalTasks::ReschedTask(task_exec.ptgi.clone(), ptt, None));
705
706                                let _ = self.poll_int.aquire().map(|v| v.interrupt_drop());
707                            }
708                        }
709
710                        drop(task_exec);
711                    }
712                }
713            }
714
715            let spti_lock_res =  self.spti.try_lock();
716
717            if let Ok(mut task_token) = spti_lock_res
718            {
719                let thread_hndl_cnt = task_token.thread_pool.get().unwrap().len();
720
721                // check global task queue
722                while let Steal::Success(task) = self.global_task_injector.steal()
723                {
724                    match task
725                    {
726                        GlobalTasks::AddTask(task_name, ptt, ptg, opt_err_ret) => 
727                        {
728                            if task_token.contains_task(&task_name) == true
729                            {
730                                if let Some(err_ret) = opt_err_ret
731                                {
732                                    let err_msg = 
733                                        map_timer_err!(TimerErrorType::Duplicate, 
734                                            "thread: '{}', task: '{}' already exists", self.thread_name, task_name);
735
736                                    let _ = err_ret.send(Err(err_msg));
737                                }
738
739                                continue;
740                            }
741
742                            let period_task_ticket = 
743                                PeriodicTaskTicket::new(task_name.clone(), ptt, ptg, &task_token.timers_poll);
744
745                            if let Err(e) = period_task_ticket 
746                            {
747                                if let Some(err_ret) = opt_err_ret
748                                {
749                                    let _ = err_ret.send(Err(e));
750                                }
751
752                                continue;
753                            }
754
755                            let period_task_ticket = period_task_ticket.unwrap();
756                            
757                            task_token.insert_task(period_task_ticket);
758                            
759                            if let Some(err_ret) = opt_err_ret
760                            {
761                                let _ = err_ret.send(Ok(()));
762                            }
763                        },
764                        GlobalTasks::RemoveTask(ptg_arc, opt_err_ret) => 
765                        {
766                            let res = task_token.remove_task(&ptg_arc.task_name);
767
768                            if let Err(e) = res
769                            {
770                                if let Some(err_ret) = opt_err_ret
771                                {
772                                    let err_msg = 
773                                        map_timer_err!(e.get_error_type(), 
774                                            "thread: '{}', {}", self.thread_name, e.get_error_msg());
775
776                                    let _ = err_ret.send(Err(err_msg));
777                                }
778
779                                continue;
780                            }
781
782                            let ptt_ref = res.unwrap();
783
784                            //let ptt = ptt_ref.into_inner();
785
786                            // stop timer
787                            if let Err(e) = ptt_ref.unset_timer()
788                            {
789                                if let Some(err_ret) = opt_err_ret.as_ref()
790                                {
791                                    let _ = err_ret.send(Err(e));
792                                }
793
794                                continue;
795                            }
796
797                            drop(ptt_ref);
798
799                            /*if let Some(err_ret) = opt_err_ret
800                            {
801                                let _ = err_ret.send(Ok(()));
802                            }
803
804                            if let Some(ptt_ref) = task_token.tasks.remove(&ptg_arc.task_name)
805                            {
806                                
807                            }
808                            else
809                            {
810                                if let Some(err_ret) = opt_err_ret
811                                {
812                                    let err_msg = 
813                                        map_timer_err!(TimerErrorType::NotFound, 
814                                            "thread: '{}', task: '{}' was not found", self.thread_name, ptg_arc.task_name);
815
816                                    let _ = err_ret.send(Err(err_msg));
817                                }
818                            }*/
819
820                            drop(ptg_arc);
821                        },
822                        GlobalTasks::ReschedTask( ptg_weak, ptt, opt_err_ret ) =>
823                        {
824                            let Some(ptg_arc) = ptg_weak.upgrade()
825                                else
826                                {   
827                                    // task was removed while was in queue.
828                                    continue;
829                                };
830
831                            // replace the time
832                            /*let res_task = 
833                                task_token
834                                    .tasks
835                                    .get(&ptg_arc.task_name)
836                                    .ok_or_else(||
837                                        map_timer_err!(TimerErrorType::NotFound, 
838                                            "thread: '{}', task: '{}' was not found", self.thread_name, ptg_arc.task_name)
839                                    );*/
840
841                            let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
842
843                            let res = 
844                                match res_task
845                                {
846                                    Ok(task) => 
847                                    {
848                                        // stop timer
849                                        let _ = task.unset_timer();
850
851                                        // update timer value and set timer
852                                        let res = task.update_task_time(ptt);
853
854                                        if let Err(e) = res
855                                        {
856                                            if let Some(err_ret) = opt_err_ret.as_ref()
857                                            {
858                                                let _ = err_ret.send(Err(e));
859                                            }
860
861                                            continue;
862                                        }
863
864                                        Ok(())
865                                    },
866                                    Err(err) => 
867                                    {
868                                        Err(err)
869                                    }
870                                };
871
872                            if let Some(err_ret) = opt_err_ret
873                            {
874                                let _ = err_ret.send(res);
875                            }
876                        },
877                        GlobalTasks::SuspendTask(ptg_weak, opt_err_ret) =>
878                        {
879                            let Some(ptg_arc) = ptg_weak.upgrade()
880                                else
881                                {   
882                                    // task was removed while was in queue.
883                                    continue;
884                                };
885
886                            /*let res_task = 
887                                task_token
888                                    .tasks
889                                    .get(&ptg_arc.task_name)
890                                    .ok_or_else(||
891                                        map_timer_err!(TimerErrorType::NotFound, 
892                                            "thread: '{}', task: '{}' was not found", self.thread_name, ptg_arc.task_name)
893                                    );*/
894
895                            let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
896
897                            if let Err(e) = res_task
898                            {
899                                if let Some(err_ret) = opt_err_ret.as_ref()
900                                {
901                                    let _ = err_ret.send(Err(e));
902                                }
903
904                                continue;
905                            }
906
907                            let task = res_task.unwrap();
908
909                            let res = task.unset_timer();
910
911                            if let Some(err_ret) = opt_err_ret
912                            {
913                                let _ = err_ret.send(res);
914                            }
915                        },
916                        GlobalTasks::ResumeTask(ptg_weak, opt_err_ret) =>
917                        {
918                            let Some(ptg_arc) = ptg_weak.upgrade()
919                                else
920                                {   
921                                    // task was removed while was in queue.
922                                    continue;
923                                };
924
925                            /*let res_task = 
926                                task_token
927                                    .tasks
928                                    .get(&ptg_arc.task_name)
929                                    .ok_or_else(||
930                                        map_timer_err!(TimerErrorType::NotFound, 
931                                            "thread: '{}', timer with FD: '{}' was not found", self.thread_name, ptg_arc.task_name)
932                                    );*/
933
934                            let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
935
936                            if let Err(e) = res_task
937                            {
938                                if let Some(err_ret) = opt_err_ret.as_ref()
939                                {
940                                    let _ = err_ret.send(Err(e));
941                                }
942
943                                continue;
944                            }
945
946                            let res = res_task.unwrap().set_timer();
947
948                            if let Some(err_ret) = opt_err_ret.as_ref()
949                            {
950                                let _ = err_ret.send(res);
951                            }
952                        }
953                    }
954                }
955
956                // poll
957                let Some(res) = 
958                    task_token
959                        .timers_poll
960                        .poll(Some(5000))?
961                else
962                    {
963                        continue;
964                    };
965
966
967                for event in res
968                {
969                    match event
970                    {
971                        PollEventType::TimerRes(timer_fd, timer_res) => 
972                        {
973                            // resolve fd into task
974                            /*let task = 
975                                task_token
976                                    .tasks
977                                    .get(&timer_fd)
978                                    .ok_or_else(||
979                                        map_timer_err!(TimerErrorType::NotFound, 
980                                            "thread: '{}', timer with FD: '{}' was not found", self.thread_name, timer_fd)
981                                    )?;*/
982                            
983                            let (ptg, ptt, ticket_arc_opt, task_name) = 
984                            {
985                                let task = 
986                                    task_token
987                                        .get_task_by_fd(timer_fd)
988                                        .map_err(|e|
989                                            map_timer_err!(e.get_error_type(), "thread: '{}', {}", self.thread_name, e.get_error_msg())
990                                        )?;
991
992                                    (task.ptg.clone(), task.ptt.clone(), task.weak_ticket.upgrade(), task.task_name.clone())
993                            };
994
995                            /*let ptg = task.ptg.clone();
996                            let ptt = task.ptt.clone();
997                            // check if ticket presents, if not create
998                            let ticket_arc_opt = task.weak_ticket.upgrade();
999                            let task_name = task.task_name.clone();*/
1000
1001                  
1002
1003                            // check that PeriodicTaskGuard is ok
1004                            let Some(ptg_arc) = ptg.upgrade()
1005                                else
1006                                {
1007                                    //let task_name = task.task_name.clone();
1008
1009                                    // if Arc cannot be upgraded, then remove the task as removed
1010                                    //task_token.tasks.remove(&timer_fd);
1011                                    let _ = task_token.remove_task(&task_name);
1012
1013                                    // continue event handling
1014                                    continue;
1015                                };
1016
1017                            // check timer result
1018                            let overflow_cnt: u64 = 
1019                                match timer_res
1020                                {
1021                                    TimerReadRes::Ok(overfl) => 
1022                                    {
1023                                        overfl
1024                                    },
1025                                    TimerReadRes::Cancelled => 
1026                                    {
1027                                        // the system time was modified, resched instance
1028
1029                                        self
1030                                            .global_task_injector
1031                                            .push(
1032                                                GlobalTasks::ReschedTask(ptg.clone(), ptt.clone(), None)
1033                                            );
1034
1035                                        // timer was reset continue
1036                                        continue;
1037                                    },
1038                                    TimerReadRes::WouldBlock => 
1039                                    {
1040                                        // should not happen, silently ignore or panic?
1041                                        panic!("assertion trap: timer retuned WouldBlock, {}", ptg_arc);
1042                                    }
1043                                };
1044
1045                            // check if ticket presents, if not create
1046                           // let ticket_arc_opt = task.weak_ticket.upgrade();
1047
1048
1049                            let ticket = 
1050                                match ticket_arc_opt
1051                                {
1052                                    Some(ticket) => 
1053                                        ticket,
1054                                    None => 
1055                                    {    
1056                                        let task_thread = 
1057                                            {
1058                                                /*let thread_local_hnd = 
1059                                                    task_token
1060                                                        .thread_pool
1061                                                        .get()
1062                                                        .unwrap()[self.thread_last_id]
1063                                                        .get_thread_handler();*/
1064
1065                                               
1066
1067                                                self.thread_last_id = (self.thread_last_id + 1) % thread_hndl_cnt;
1068
1069                                                //thread_local_hnd
1070                                                task_token.clone_thread_handler(self.thread_last_id)
1071                                            }; 
1072
1073                                        let ticket =
1074                                                Arc::new(NewTaskTicket::new(task_thread, ptg.clone()));
1075
1076                                        let task = 
1077                                            task_token
1078                                                .get_task_by_fd(timer_fd)
1079                                                .map_err(|e|
1080                                                    map_timer_err!(e.get_error_type(), "thread: '{}', {}", self.thread_name, e.get_error_msg())
1081                                                )?;
1082                                                
1083                                        task.weak_ticket = Arc::downgrade(&ticket);
1084
1085                                        ticket
1086                                    }
1087                                };
1088                            
1089
1090                            
1091                            NewTaskTicket::send_task(ticket, overflow_cnt, thread_hndl_cnt);
1092                        },
1093                        _ => 
1094                        {
1095                            // ignore
1096                        },
1097                    }
1098                } // for
1099            }
1100            else if let Err(TryLockError::WouldBlock) = spti_lock_res
1101            {
1102                if self.thread_run_flag.load(Ordering::Acquire) == false
1103                {
1104                    return Ok(());
1105                }
1106
1107                if self.local_thread_inj.is_empty() == true
1108                {
1109                    std::thread::park_timeout(Duration::from_secs(2));
1110                }
1111            } 
1112            
1113        } // while
1114
1115        return Ok(());
1116    }
1117}
1118
1119
1120/// A common instance which contains the specific thread handler and
1121/// a [Weak] reference to the loop controlling flag.
1122#[derive(Debug)]
1123struct ThreadHandler
1124{
1125    /// A thread join handler
1126    hndl: JoinHandle<TimerResult<()>>,
1127
1128    /// A local task injector.
1129    task_injector: Arc<Injector<ThreadTask>>,
1130
1131    /// A flag which controls the thread loop. Initialized as `true`.
1132    thread_flag: Weak<AtomicBool>,
1133}
1134
1135impl ThreadHandler
1136{
1137    fn new(hndl: JoinHandle<TimerResult<()>>, task_injector: Arc<Injector<ThreadTask>>, thread_flag: Weak<AtomicBool>) -> Self
1138    {            
1139        return 
1140            Self
1141            {
1142                hndl,
1143                task_injector,
1144                thread_flag: thread_flag
1145            };
1146    }
1147
1148    fn stop(&self)
1149    {
1150        if let Some(v) = self.thread_flag.upgrade()
1151        {
1152            v.store(false, Ordering::Release);
1153        }
1154    }
1155
1156    fn unpark(&self) 
1157    {
1158        self.hndl.thread().unpark();
1159    }
1160
1161    fn send_task(&self, task: Arc<NewTaskTicket>, unpark: bool)
1162    {
1163        self.task_injector.push(ThreadTask::TaskExec(task));
1164
1165        if unpark == true
1166        {
1167            self.hndl.thread().unpark();
1168        }
1169
1170        return;
1171    }
1172
1173    fn clean_local_queue(&self)
1174    {
1175        while let Steal::Success(_) = self.task_injector.steal() {}
1176
1177        return;
1178    }
1179}
1180
1181/// A instance which contains all information about the threads allocated for the
1182/// task execution, tasks and timer polling. An instance is shared with all threads.
1183#[derive(Debug)]
1184pub struct SharedPeriodicTasks
1185{
1186    /// A list of the threads. A [OnceCell] is used to initialize the field once.
1187    thread_pool: OnceCell<Arc<Vec<Arc<ThreadHandler>>>>,
1188
1189    /// A [HashMap] of the regestered tasks. Each task is mapped to its timer FD.
1190    tasks_by_fd: HashMap<RawFd, PeriodicTaskTicket>,
1191
1192    tasks_name_to_fd: HashMap<String, RawFd>,
1193
1194    /// A timer event poll. 
1195    timers_poll: TimerPoll,
1196}
1197
1198
1199impl SharedPeriodicTasks
1200{ 
1201    fn new() -> TimerResult<Self>
1202    {
1203
1204        return Ok( 
1205            Self 
1206            { 
1207                thread_pool: OnceCell::default(),
1208                tasks_by_fd: HashMap::new(),
1209                tasks_name_to_fd: HashMap::new(),
1210                timers_poll: TimerPoll::new()?
1211            }
1212        );
1213    }
1214
1215    fn get_task_by_fd(&mut self, timer_fd: RawFd) -> TimerResult<&mut PeriodicTaskTicket>
1216    {
1217        return 
1218            self
1219                .tasks_by_fd
1220                .get_mut(&timer_fd)
1221                .ok_or_else(||
1222                    map_timer_err!(TimerErrorType::NotFound, "task fd: '{}' was found but task was not found", 
1223                        timer_fd.as_raw_fd())
1224                );
1225    }
1226
1227    fn get_task_by_name(&mut self, task_name: &str) -> TimerResult<&mut PeriodicTaskTicket>
1228    {
1229        let res = 
1230            self
1231                .tasks_name_to_fd
1232                .get(task_name)
1233                .ok_or_else(||
1234                    map_timer_err!(TimerErrorType::NotFound, "task: '{}' was not found", task_name)
1235                )
1236                .map(|v|
1237                    self
1238                        .tasks_by_fd
1239                        .get_mut(v)
1240                        .ok_or_else(||
1241                            map_timer_err!(TimerErrorType::NotFound, "task: '{}' fd: '{}' was found but task was not found", 
1242                                task_name, v)
1243                        )
1244                )??;
1245
1246        return Ok(res);
1247    }
1248
1249    fn clone_thread_handler(&self, thread_last_id: usize) -> Arc<ThreadHandler>
1250    {
1251        let thread_local_hnd = 
1252            self
1253                .thread_pool
1254                .get()
1255                .unwrap()[thread_last_id]
1256                .clone();
1257
1258        return thread_local_hnd;
1259    }
1260
1261    fn remove_task(&mut self, task_name: &str) -> TimerResult<PeriodicTaskTicket>
1262    {
1263        let Some(task_timer_fd) = self.tasks_name_to_fd.remove(task_name)
1264            else
1265            {
1266                timer_err!(TimerErrorType::NotFound, "task: '{}' was not found", task_name);
1267            };
1268
1269        let Some(ptt) = self.tasks_by_fd.remove(&task_timer_fd)
1270            else
1271            {
1272                timer_err!(TimerErrorType::NotFound, "task: '{}' fd: '{}' was found but task was not found", 
1273                    task_name, task_timer_fd);
1274            };
1275
1276        return Ok(ptt);
1277    }
1278
1279    fn contains_task(&self, task_name: &str) -> bool
1280    {
1281        return self.tasks_name_to_fd.contains_key(task_name);
1282    }
1283
1284    fn insert_task(&mut self, period_task_ticket: PeriodicTaskTicket)
1285    {
1286        let raw_fd = period_task_ticket.sync_timer.get_inner().as_raw_fd();
1287
1288        self.tasks_name_to_fd.insert(period_task_ticket.task_name.clone(), raw_fd);
1289        self.tasks_by_fd.insert(raw_fd, period_task_ticket);
1290
1291        return;
1292    }
1293}
1294
1295#[derive(Debug)]
1296pub struct SyncPeriodicTasksInner
1297{
1298    /// A [Weak] reference to the [EventFd] of the timer `poll` which is used to interrupt the polling.
1299    poll_int: PollInterrupt,
1300
1301    /// A `global` FIFO which is used to send commands to the task executor.
1302    task_injector: Arc<Injector<GlobalTasks>>,
1303}
1304
1305impl SyncPeriodicTasksInner
1306{
1307    fn send_global_cmd(&self, glob: GlobalTasks) -> TimerResult<()>
1308    {
1309        let poll_int = 
1310            self.poll_int.aquire()?;
1311                
1312        self.task_injector.push(glob);
1313
1314        poll_int.interrupt_drop()?;
1315
1316        return Ok(());
1317    }
1318
1319    fn clear_global_queue(&self)
1320    {
1321        while let Steal::Success(_) = self.task_injector.steal() {}
1322
1323        return;
1324    }
1325}
1326
1327/// A wrapper for the task which is described in closure.
1328struct SyncCallableOper
1329{
1330    op: Box<dyn FnMut() -> PeriodicTaskResult>,
1331}
1332
1333unsafe impl Send for SyncCallableOper {}
1334
1335impl PeriodicTask for SyncCallableOper
1336{
1337    fn exec(&mut self) -> PeriodicTaskResult 
1338    {
1339        return (self.op)();
1340    }
1341}
1342
1343/// A `main` instance which spawns the task executor.
1344/// 
1345/// ```ignore
1346/// let spt = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1347/// let task1 = TaskStruct1::new(2, send);
1348/// let task1_ptt = PeriodicTaskTime::Relative(TimerExpMode::<RelativeTime>::new_interval(RelativeTime::new_time(1, 0)));
1349/// let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1350/// ```
1351/// 
1352/// This instance should be kept somewhere and dropped only if the task executor WITH
1353/// all spawned tasks are no longer needed.
1354#[derive(Debug, Clone)]
1355pub struct SyncPeriodicTasks
1356{
1357    threads: Option<Arc<Vec<Arc<ThreadHandler>>>>,
1358
1359    inner: Arc<SyncPeriodicTasksInner>,
1360}
1361
1362impl Drop for SyncPeriodicTasks
1363{
1364    fn drop(&mut self) 
1365    {
1366        self.inner.clear_global_queue();
1367
1368        let mut threads = self.threads.take().unwrap();
1369
1370        // stop all threads
1371        for thread in threads.iter()
1372        {
1373            thread.stop();
1374            thread.unpark();
1375        }
1376
1377        // interrupt poll
1378        let _ = self.inner.poll_int.aquire().map(|v| v.interrupt_drop());
1379
1380        for _ in 0..5
1381        {
1382            let threads_unwr = 
1383                match Arc::try_unwrap(threads)
1384                {
1385                    Ok(r) => r,
1386                    Err(e) =>
1387                    {
1388                        threads = e;
1389
1390                        std::thread::sleep(Duration::from_millis(500));
1391
1392                        continue;
1393
1394                    }
1395                };
1396
1397            for thread in threads_unwr
1398            {
1399                thread.clean_local_queue();
1400
1401                let Some(thread) = Arc::into_inner(thread)
1402                else
1403                {
1404                    panic!("assertion trap: ~SyncPeriodicTasks, a reference to ThreadHandler left somewhere");
1405                };
1406
1407                let _ = thread.hndl.join();
1408            }
1409
1410            break;
1411        }
1412    }
1413}
1414
1415
1416impl SyncPeriodicTasks
1417{
1418
1419    
1420    /// Creates new instance. An amount of threads allocated for the task executor
1421    /// should be specified. All threads will be started immidiatly. For small
1422    /// tasks one thread will be enough. For a large amount of tasks, especially it
1423    /// tasks are waken up oftenly then at least two threads should be allocated.
1424    /// 
1425    /// # Arguments
1426    /// 
1427    /// * `threads_cnt` - a [NonZeroUsize] amount of threads.
1428    /// 
1429    /// # Returns
1430    /// 
1431    /// The [Result] is returned as alias [TimerResult].
1432    pub 
1433    fn new(threads_cnt: NonZeroUsize) -> TimerResult<Self>
1434    {
1435        let spti = SharedPeriodicTasks::new()?;
1436        let poll_int = spti.timers_poll.get_poll_interruptor();
1437
1438        // wrap into the mutex because this instance is shared 
1439        let spti = Arc::new(Mutex::new(spti));
1440
1441        let task_injector = Arc::new(Injector::<GlobalTasks>::new());
1442
1443        let mut thread_hndls: Vec<Arc<ThreadHandler>> = Vec::with_capacity(threads_cnt.get());
1444
1445        // spawn threads, all spawn threads will be parked by default.
1446        for i in 0..threads_cnt.get()
1447        {
1448            let handler = 
1449                ThreadWorker::new(format!("timer_exec/{}s", i), task_injector.clone(), spti.clone(), poll_int.clone())?;
1450
1451            thread_hndls.push(Arc::new(handler));
1452        }
1453
1454        let thread_hndls = Arc::new(thread_hndls);
1455
1456        // Lock the instance to initialze the `thread_pool` variable.
1457        let spti_lock = spti.lock().unwrap();
1458        spti_lock.thread_pool.get_or_init(|| thread_hndls.clone());
1459
1460        // get a random thread to unpark it and start the process
1461        let thread = 
1462            spti_lock
1463                .thread_pool
1464                .get()
1465                .unwrap()
1466                .get(random_range(0..threads_cnt.get()))
1467                .unwrap()
1468                .clone();
1469
1470        drop(spti_lock);
1471
1472        // unpark thread
1473        thread.unpark();
1474
1475        let inner = 
1476            SyncPeriodicTasksInner
1477            {
1478                poll_int: poll_int,
1479                task_injector: task_injector,
1480            };
1481
1482        return Ok( 
1483            Self 
1484            { 
1485                threads: Some(thread_hndls),
1486                inner: Arc::new(inner),
1487            }
1488        );
1489    }
1490
1491    /// Adds and spawns the task. The task is defined with generic `T` which is any 
1492    /// datatype which implements [PeriodicTask].
1493    /// 
1494    /// # Arguments
1495    /// 
1496    /// * `task_name` - a task name. used only for identification purposes in debug messages.
1497    /// 
1498    /// * `task` - a task which should be executed. It should implenet [PeriodicTask].
1499    /// 
1500    /// * `task_time` - [PeriodicTaskTime] a time when the task must be spawned.
1501    /// 
1502    /// # Returns
1503    /// 
1504    /// The [Result] is returned as alias [TimerResult].
1505    /// 
1506    /// # Example
1507    /// 
1508    /// ```ignore
1509    /// #[derive(Debug)]
1510    /// struct TaskStruct1
1511    /// {
1512    ///     a1: u64,
1513    ///     s: Sender<u64>,
1514    /// }
1515    /// 
1516    /// impl TaskStruct1
1517    /// {
1518    ///     fn new(a1: u64, s: Sender<u64>) -> Self
1519    ///     {
1520    ///         return Self{ a1: a1, s };
1521    ///     }
1522    /// }
1523    /// 
1524    /// impl PeriodicTask for TaskStruct1
1525    /// {
1526    ///     fn exec(&mut self) -> PeriodicTaskResult
1527    ///     {
1528    ///         println!("taskstruct1 val: {}", self.a1);
1529    ///         let _ = self.s.send(self.a1);
1530    ///         return PeriodicTaskResult::Ok;
1531    ///     }
1532    /// }
1533    /// 
1534    /// fn main()
1535    /// {
1536    ///     let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1537    ///     
1538    ///     let (send, recv) = mpsc::channel::<u64>();
1539    /// 
1540    ///     let task1 = TaskStruct1::new(2, send);
1541    /// 
1542    ///     let task1_ptt = PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1543    /// 
1544    ///     let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1545    /// 
1546    ///     println!("added");
1547    /// 
1548    ///     let val = recv.recv();
1549    /// 
1550    ///     println!("{:?}", val);
1551    /// 
1552    ///     drop(task1_guard);
1553    /// }
1554    /// ```
1555    pub 
1556    fn add<T>(&self, task_name: impl Into<String>, task: T, task_time: PeriodicTaskTime) -> TimerResult<PeriodicTaskGuard>
1557    where T: PeriodicTask
1558    {
1559        let task_int: PeriodicTaskHndl = Box::new(task);
1560
1561        let task_name_str: String = task_name.into();
1562
1563        let period_task_guard = 
1564            Arc::new(PeriodicTaskGuardInner::new(task_name_str.clone(), task_int)?);
1565
1566
1567       /* let period_task_ticket = 
1568            PeriodicTaskTicket::new(task_name_str.clone(), task_time, Arc::downgrade(&period_task_guard));*/
1569
1570        let (mpsc_send, mpsc_recv) = mpsc::channel();
1571
1572        self.inner.send_global_cmd(GlobalTasks::AddTask(task_name_str.clone(), task_time, period_task_guard.clone(), Some(mpsc_send)) )?;
1573
1574        let _ = 
1575            mpsc_recv
1576                .recv()
1577                .map_err(|e|
1578                    map_timer_err!(TimerErrorType::ExternalError, "mpsc error: {}", e)
1579                )??;
1580
1581        let ret = 
1582            PeriodicTaskGuard
1583            {
1584                task_name: task_name_str,
1585                guard: Some(period_task_guard),
1586                spt: self.inner.clone()
1587            };
1588
1589        return Ok(ret);
1590    }
1591
1592    /// Adds and spawns the task in form of a closure.
1593    /// 
1594    /// # Arguments
1595    /// 
1596    /// * `task_name` - a task name. used only for identification purposes in debug messages.
1597    /// 
1598    /// * `task_time` - [PeriodicTaskTime] a time when the task must be spawned.
1599    /// 
1600    /// * `clo` - a function to exec on timeout. The function must return [PeriodicTaskResult].
1601    /// 
1602    /// # Returns
1603    /// 
1604    /// The [Result] is returned as alias [TimerResult].
1605    /// 
1606    /// # Example
1607    /// 
1608    /// ```ignore
1609    /// 
1610    /// let task1_ptt = PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1611    /// 
1612    /// let (send, recv) = mpsc::channel::<u64>();
1613    /// 
1614    /// let task1_guard = 
1615    ///     s.add_closure("task2", task1_ptt, 
1616    ///         move || 
1617    ///         { 
1618    ///             println!("test output"); 
1619    ///             send.send(1).unwrap();
1620    /// 
1621    ///             return PeriodicTaskResult::Ok;
1622    ///         }
1623    ///     ).unwrap();
1624    /// 
1625    /// let val = recv.recv();
1626    /// 
1627    /// println!("{:?}", val);
1628    /// ```
1629    pub 
1630    fn add_closure<F>(&self, task_name: impl Into<String>, task_time: PeriodicTaskTime, clo: F) -> TimerResult<PeriodicTaskGuard>
1631    where F: 'static + FnMut() -> PeriodicTaskResult + Send
1632    {
1633        let closure_task = SyncCallableOper{ op: Box::new(clo) };
1634
1635        return self.add(task_name, closure_task, task_time);
1636    }
1637    /// Checks if any thread have crashed and no longer works.
1638    /// 
1639    /// # Returns
1640    /// 
1641    /// A [Option] is retuerned with the inner data:
1642    /// 
1643    /// * [Option::Some] with the thread name [String] that have quit.
1644    /// 
1645    /// * [Option::None] indicating that everthing is fine.
1646    pub 
1647    fn check_thread_status(&self) -> Option<String>
1648    {
1649        for thread in self.threads.as_ref().unwrap().iter()
1650        {
1651            if let None = thread.thread_flag.upgrade()
1652            {
1653                return Some(thread.hndl.thread().name().unwrap().to_string());
1654            }
1655        }
1656
1657        return None;
1658    }
1659}
1660
1661#[cfg(test)]
1662mod tests
1663{
1664    use core::fmt;
1665    use std::{sync::mpsc::{self, RecvTimeoutError, Sender}, time::{Duration, Instant}};
1666
1667    use crate::{periodic_task::sync_tasks::{PeriodicTask, PeriodicTaskResult, PeriodicTaskTime, SyncPeriodicTasks}, AbsoluteTime, RelativeTime};
1668
1669    #[derive(Debug)]
1670    struct TaskStruct1
1671    {
1672        a1: u64,
1673        s: Sender<u64>,
1674    }
1675
1676    impl TaskStruct1
1677    {
1678        fn new(a1: u64, s: Sender<u64>) -> Self
1679        {
1680            return Self{ a1: a1, s };
1681        }
1682    }
1683
1684    impl PeriodicTask for TaskStruct1
1685    {
1686        fn exec(&mut self) -> PeriodicTaskResult
1687        {
1688            println!("taskstruct1 val: {}", self.a1);
1689
1690            let _ = self.s.send(self.a1);
1691
1692            return PeriodicTaskResult::Ok;
1693        }
1694    }
1695
1696    #[derive(Debug)]
1697    struct TaskStruct2
1698    {
1699        a1: u64,
1700        s: Sender<u64>,
1701    }
1702
1703    impl TaskStruct2
1704    {
1705        fn new(a1: u64, s: Sender<u64>) -> Self
1706        {
1707            return Self{ a1: a1, s };
1708        }
1709    }
1710
1711    impl PeriodicTask for TaskStruct2
1712    {
1713        fn exec(&mut self) -> PeriodicTaskResult
1714        {
1715            println!("taskstruct2 val: {}", self.a1);
1716
1717            self.s.send(self.a1).unwrap();
1718
1719            return PeriodicTaskResult::TaskReSchedule(PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(2, 0)));
1720        }
1721    }
1722
1723    impl<F> PeriodicTask for F
1724    where F: 'static + FnMut() + Send + fmt::Debug
1725    {
1726        fn exec(&mut self) -> PeriodicTaskResult 
1727        {
1728            (self)();
1729            return PeriodicTaskResult::Ok;
1730        }
1731    }
1732
1733    #[test]
1734    fn ttt()
1735    {
1736        let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1737
1738        let task1_ptt = PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1739
1740        let (send, recv) = mpsc::channel::<u64>();
1741 
1742        let task1_guard = 
1743            s.add_closure("task2", task1_ptt, 
1744                move || 
1745                { 
1746                    println!("test output"); 
1747                    send.send(1).unwrap();
1748
1749                    return PeriodicTaskResult::Ok;
1750                }
1751            ).unwrap();
1752
1753
1754        println!("added");
1755        let val = recv.recv();
1756
1757        println!("{:?}", val);
1758
1759        drop(task1_guard);
1760    }
1761
1762    #[test]
1763    fn test1_absolute_simple()
1764    {
1765        let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1766
1767        let (send, recv) = mpsc::channel::<u64>();
1768
1769        let task1 = TaskStruct1::new(2, send);
1770        let task1_ptt = PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1771        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1772
1773        println!("added");
1774        let val = recv.recv();
1775
1776        println!("{:?}", val);
1777
1778        drop(task1_guard);
1779    }
1780
1781    #[test]
1782    fn test1_relative_simple()
1783    {
1784        let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1785
1786        let (send, recv) = mpsc::channel::<u64>();
1787
1788        let task1 = TaskStruct1::new(2, send);
1789        let task1_ptt = PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1790        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1791        
1792        let mut s = Instant::now();
1793
1794        for i in 0..3
1795        {
1796            let val = recv.recv().unwrap();
1797
1798            let e = s.elapsed();
1799            s = Instant::now();
1800
1801            println!("{}: {:?} {:?} {}", i, val, e, e.as_micros());
1802            
1803            assert!(999000 < e.as_micros() && e.as_micros() < 10001200);
1804            assert_eq!(val, 2);
1805        }
1806
1807        drop(task1_guard);
1808
1809        std::thread::sleep(Duration::from_millis(100));
1810
1811        return;
1812    }
1813    
1814    #[test]
1815    fn test1_relative_resched_to_abs()
1816    {
1817        let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1818
1819        let (send, recv) = mpsc::channel::<u64>();
1820
1821        let task1 = TaskStruct2::new(2, send);
1822        let task1_ptt = PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1823        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1824
1825        let s = Instant::now();
1826        match recv.recv_timeout(Duration::from_millis(1150))
1827        {
1828            Ok(rcv_a) => 
1829            {
1830                let e = s.elapsed();
1831                println!("{:?} {}", e, e.as_micros());
1832                assert_eq!(rcv_a, 2);
1833                assert!(999051 < e.as_micros() && e.as_micros() < 1000551);
1834            },
1835            Err(RecvTimeoutError::Timeout) => 
1836                panic!("tineout"),
1837            Err(e) =>
1838                panic!("{}", e),
1839        }
1840
1841        let s = Instant::now();
1842        match recv.recv_timeout(Duration::from_millis(2100))
1843        {
1844            Ok(rcv_a) => 
1845            {
1846                let e = s.elapsed();
1847                println!("{:?} {}", e, e.as_micros());
1848                assert_eq!(rcv_a, 2);
1849                assert!(1999642 < e.as_micros() && e.as_micros() < 2000342);
1850            },
1851            Err(RecvTimeoutError::Timeout) => 
1852                panic!("tineout"),
1853            Err(e) =>
1854                panic!("{}", e),
1855        }
1856
1857        let s = Instant::now();
1858        match recv.recv_timeout(Duration::from_millis(2100))
1859        {
1860            Ok(rcv_a) => 
1861            {
1862                let e = s.elapsed();
1863                println!("{:?} {}", e, e.as_micros());
1864                assert_eq!(rcv_a, 2);
1865                assert!(1999642 < e.as_micros() && e.as_micros() < 2000342);
1866            },
1867            Err(RecvTimeoutError::Timeout) => 
1868                panic!("tineout"),
1869            Err(e) =>
1870                panic!("{}", e),
1871        }
1872
1873        drop(task1_guard);
1874
1875        std::thread::sleep(Duration::from_millis(100));
1876
1877        return;
1878    }
1879
1880    #[test]
1881    fn test1_relative_simple_resched()
1882    {
1883        let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1884
1885        let (send, recv) = mpsc::channel::<u64>();
1886
1887        let task1 = TaskStruct1::new(2, send);
1888        let task1_ptt = 
1889            PeriodicTaskTime::interval(
1890                RelativeTime::new_time(1, 0)
1891            );
1892        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1893        
1894        let mut s = Instant::now();
1895
1896        for i in 0..3
1897        {
1898            let val = recv.recv().unwrap();
1899
1900            let e = s.elapsed();
1901            s = Instant::now();
1902
1903            println!("{}: {:?} {:?} {}", i, val, e, e.as_micros());
1904            
1905            assert!(999000 < e.as_micros() && e.as_micros() < 10001200);
1906            assert_eq!(val, 2);
1907        }
1908
1909        task1_guard
1910            .reschedule_task(
1911                PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(2, 0))
1912            )
1913            .unwrap();
1914
1915        s = Instant::now();
1916        let val = recv.recv().unwrap();
1917        let e = s.elapsed();
1918
1919        println!("resched: {:?} {:?} {}", val, e, e.as_micros());
1920
1921        assert!(1999000 < e.as_micros() && e.as_micros() < 2000560);
1922
1923        let val = recv.recv_timeout(Duration::from_secs(3));
1924
1925        assert_eq!(val.is_err(), true);
1926        assert_eq!(val.err().unwrap(), RecvTimeoutError::Timeout);
1927
1928        drop(task1_guard);
1929
1930        std::thread::sleep(Duration::from_millis(100));
1931
1932        return;
1933    }
1934
1935    #[test]
1936    fn test1_relative_simple_cancel()
1937    {
1938        let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1939
1940        let (send, recv) = mpsc::channel::<u64>();
1941
1942        let task1 = TaskStruct1::new(0, send.clone());
1943        let task1_ptt = 
1944            PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1945
1946        let task2 = TaskStruct1::new(1, send.clone());
1947        let task2_ptt = 
1948            PeriodicTaskTime::interval(RelativeTime::new_time(2, 0));
1949
1950        let task3 = TaskStruct1::new(2, send);
1951        let task3_ptt = 
1952            PeriodicTaskTime::interval(RelativeTime::new_time(0, 500_000_000));
1953
1954        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1955        let task2_guard = s.add("task2", task2, task2_ptt).unwrap();
1956        let task3_guard = s.add("task3", task3, task3_ptt).unwrap();
1957
1958        let mut a_cnt: [u8; 3] = [0_u8; 3];
1959
1960        let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
1961
1962        while AbsoluteTime::now() < end
1963        {
1964            match recv.recv_timeout(Duration::from_millis(1))
1965            {
1966                Ok(rcv_a) => 
1967                    a_cnt[rcv_a as usize] += 1,
1968                Err(RecvTimeoutError::Timeout) => 
1969                    continue,
1970                Err(e) =>
1971                    panic!("{}", e),
1972            }
1973
1974            
1975        }
1976
1977        assert_eq!(a_cnt[0], 5);
1978        assert_eq!(a_cnt[1], 2);
1979        assert_eq!(a_cnt[2], 10);
1980
1981        // drop task #3
1982        task3_guard.suspend_task().unwrap();
1983
1984
1985        let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
1986
1987        while AbsoluteTime::now() < end
1988        {
1989            match recv.recv_timeout(Duration::from_millis(1))
1990            {
1991                Ok(rcv_a) => 
1992                    a_cnt[rcv_a as usize] += 1,
1993                Err(RecvTimeoutError::Timeout) => 
1994                    continue,
1995                Err(e) =>
1996                    panic!("{}", e),
1997            }
1998
1999            
2000        }
2001
2002        assert_eq!(a_cnt[0] > 5, true);
2003        assert_eq!(a_cnt[1] > 2, true);
2004        assert!((a_cnt[2] == 10 || a_cnt[2] == 11));
2005
2006        drop(task1_guard);
2007        drop(task2_guard);
2008        drop(task3_guard);
2009
2010        let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
2011
2012        while AbsoluteTime::now() < end
2013        {
2014            match recv.recv_timeout(Duration::from_millis(1))
2015            {
2016                Ok(rcv_a) => 
2017                    a_cnt[rcv_a as usize] += 1,
2018                Err(RecvTimeoutError::Timeout) => 
2019                    continue,
2020                Err(_) =>
2021                    break,
2022            }
2023
2024            
2025        }
2026
2027        assert_eq!(AbsoluteTime::now() < end, true);
2028
2029        
2030
2031        return;
2032    }
2033
2034    // freebsd text failed with  src/periodic_task/sync_tasks.rs:1784:9 left: 6
2035    #[test]
2036    fn test2_multithread_1()
2037    {
2038        let s = SyncPeriodicTasks::new(2.try_into().unwrap()).unwrap();
2039
2040        let (send, recv) = mpsc::channel::<u64>();
2041
2042        let task1 = TaskStruct1::new(0, send.clone());
2043        let task1_ptt = 
2044            PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
2045
2046        let task2 = TaskStruct1::new(1, send.clone());
2047        let task2_ptt = 
2048            PeriodicTaskTime::interval(RelativeTime::new_time(2, 0));
2049
2050        let task3 = TaskStruct1::new(2, send.clone());
2051        let task3_ptt = 
2052            PeriodicTaskTime::interval(RelativeTime::new_time(0, 500_000_000));
2053
2054        let task4 = TaskStruct1::new(3, send.clone());
2055        let task4_ptt = 
2056            PeriodicTaskTime::interval(RelativeTime::new_time(0, 200_000_000));
2057
2058        let task5 = TaskStruct1::new(4, send.clone());
2059        let task5_ptt = 
2060            PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(5, 0));
2061
2062        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
2063        let task2_guard = s.add("task2", task2, task2_ptt).unwrap();
2064        let task3_guard = s.add("task3", task3, task3_ptt).unwrap();
2065        let task4_guard = s.add("task4", task4, task4_ptt).unwrap();
2066        let task5_guard = s.add("task5", task5, task5_ptt).unwrap();
2067
2068
2069        let mut a_cnt: [u8; 5] = [0_u8; 5];
2070
2071        let end = AbsoluteTime::now() + RelativeTime::new_time(5, 500_000_000);
2072
2073        while AbsoluteTime::now() < end
2074        {
2075            match recv.recv_timeout(Duration::from_millis(1))
2076            {
2077                Ok(rcv_a) => 
2078                    a_cnt[rcv_a as usize] += 1,
2079                Err(RecvTimeoutError::Timeout) => 
2080                    continue,
2081                Err(e) =>
2082                    panic!("{}", e),
2083            }
2084
2085            
2086        }
2087
2088        println!("{:?}", a_cnt);
2089
2090        assert!(a_cnt[0] == 5);
2091        assert!(a_cnt[1] == 2);
2092        assert!((a_cnt[2] == 10 || a_cnt[2] == 11));
2093        assert!(a_cnt[3] == 27);
2094        assert!(a_cnt[4] == 1);
2095
2096        task5_guard.reschedule_task(PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(0, 500_000_000))).unwrap();
2097
2098        let end = AbsoluteTime::now() + RelativeTime::new_time(0, 600_000_000);
2099
2100        while AbsoluteTime::now() < end
2101        {
2102            match recv.recv_timeout(Duration::from_millis(1))
2103            {
2104                Ok(rcv_a) => 
2105                    a_cnt[rcv_a as usize] += 1,
2106                Err(RecvTimeoutError::Timeout) => 
2107                    continue,
2108                Err(e) =>
2109                    panic!("{}", e),
2110            }
2111        }
2112
2113        println!("{:?}", a_cnt);
2114        assert!(a_cnt[4] == 2);
2115
2116        drop(task5_guard);
2117        drop(task4_guard);
2118        drop(task3_guard);
2119        drop(task2_guard);
2120
2121        let end = AbsoluteTime::now() + RelativeTime::new_time(2, 1000);
2122
2123        while AbsoluteTime::now() < end
2124        {
2125            match recv.recv_timeout(Duration::from_millis(1))
2126            {
2127                Ok(rcv_a) => 
2128                    a_cnt[rcv_a as usize] += 1,
2129                Err(RecvTimeoutError::Timeout) => 
2130                    continue,
2131                Err(e) =>
2132                    panic!("{}", e),
2133            }
2134        }
2135
2136
2137        println!("{:?}", a_cnt);
2138        assert_eq!(a_cnt[4], 2);
2139        assert_eq!(a_cnt[0], 8);
2140
2141        drop(task1_guard);
2142
2143        std::thread::sleep(Duration::from_millis(10));
2144        return;
2145    }
2146}