timer_deque_rs/periodic_task/
sync_tasks.rs

1/*-
2 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
6 *  4neko.org alex@4neko.org
7 * 
8 * The timer-rs crate can be redistributed and/or modified
9 * under the terms of either of the following licenses:
10 *
11 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
12 *                     
13 *   2. The MIT License (MIT)
14 *                     
15 *   3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
16 */
17
18use std::
19{
20    cell::OnceCell, 
21    cmp, 
22    collections::HashMap, 
23    fmt, 
24    num::NonZeroUsize,
25    sync::
26    {
27        Arc, Mutex, TryLockError, Weak, atomic::{AtomicBool, Ordering}, mpsc::{self, Receiver, Sender}
28    }, 
29    thread::JoinHandle, 
30    time::Duration
31};
32
33use crossbeam_deque::{Injector, Steal};
34use rand::random_range;
35
36use crate::
37{
38    AbsoluteTime, 
39    FdTimerCom, 
40    RelativeTime, 
41    TimerPoll, 
42    TimerReadRes, 
43    error::{TimerError, TimerErrorType, TimerResult}, 
44    map_timer_err, timer_err, 
45    timer_portable::
46    {
47        AsTimerId, PollEventType, PolledTimerFd, TimerExpMode, TimerFlags, TimerId, TimerType, poll::PollInterrupt, timer::TimerFd
48    }
49};
50
51/// A result of the task execution which is returned by the user task.
52/// See description for the each variant.
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum PeriodicTaskResult
55{
56    /// The task exec was ok. No errors.
57    Ok,
58
59    /// Cancels (but not removing) the task due to error or for other reason.
60    /// By returning this result the timer is unset to stop generating events. 
61    /// But, if since the `AbortTask` was received and a timer has timed out again 
62    /// the event will be called again. It is not necessary to return the `AbortTask` again, 
63    /// just return [PeriodicTaskResult::Ok] exiting imidiatly.
64    CancelTask,
65
66    /// A request to reschedule the task to a new time or interval. The `0` argument 
67    /// should contain new time. The timer mode can be changed.
68    TaskReSchedule(PeriodicTaskTime)
69}
70
71/// A trait which should be implemented by the instance which will be added as a `task`.
72/// 
73/// * the `instance` must implement [Send] because it may be moved to other thread.
74/// 
75/// * it is not necessary that the `instance` to impl [Sync] because the `instance` will never
76/// be executed from two different threads and it is already protected by mutex.
77/// 
78/// A `task` should never block the thread i.e call some functions which may block for a large
79/// period of time! 
80pub trait PeriodicTask: Send + 'static
81{
82    /// A task entry point. The task should return a [PeriodicTaskResult].
83    fn exec(&mut self) -> PeriodicTaskResult;
84}
85
86/// An alias for the boxed value.
87pub type PeriodicTaskHndl = Box<dyn PeriodicTask>;
88
89
90/// A global FIFO commands.
91/// 
92/// For all variants, the option [Option] [mpsc::Sender] is used for the error feedback and if
93/// it is set to [Option::None] the error will not be reported.
94#[derive(Debug)]
95pub(crate) enum GlobalTasks
96{
97    /// Adds task to the task system. If error happens it will be reported over the `1` and item `0`
98    /// will not be added to the instance.
99    AddTask( String, PeriodicTaskTime, Arc<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
100
101    /// Removes the task from the system.
102    RemoveTask( Arc<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
103
104    /// Changes the timer value.
105    ReschedTask( Weak<PeriodicTaskGuardInner>, PeriodicTaskTime, Option<mpsc::Sender<TimerResult<()>>> ),
106
107    /// Suspends the task but does not remove the task. The task which is planned to suspend, if it is already
108    /// in the exec queue, will be executed.
109    SuspendTask( Weak<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
110
111    /// Resumes the task by enabling timer.
112    ResumeTask( Weak<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
113}
114
115/// A local task queue commands.
116#[derive(Debug)]
117pub enum ThreadTask
118{
119    /// Execute the task.
120    TaskExec( Arc<NewTaskTicket> )
121}
122
123/// A ticket which is assigned to specific task in order to send the same task to the same
124/// thread which has been already assigned. If only one thread is allocated, then this is not
125/// really necessary (will be optimized in future).
126#[derive(Debug)]
127pub struct NewTaskTicket
128{
129    /// A thread handler of the specific thread.
130    task_thread: Arc<ThreadHandler>,
131
132    /// A task. A weak reference is used in order to determine if task was
133    /// removed and the exec is not necessary.
134    ptgi: Weak<PeriodicTaskGuardInner>,
135}
136
137impl NewTaskTicket
138{
139    fn new(task_thread: Arc<ThreadHandler>, ptgi: Weak<PeriodicTaskGuardInner>) -> Self
140    {
141        return 
142            Self
143            {
144                task_thread: 
145                    task_thread,
146                ptgi:
147                    ptgi
148            };
149    }
150
151    /// Sends the task on exec to the local queue of the thread.
152    fn send_task(this: Arc<NewTaskTicket>, task_rep_count: u64, thread_hndl_cnt: usize)
153    {
154        let strong_cnt = Arc::strong_count(&this);
155        let thread = this.task_thread.clone();
156
157        for _ in 0..task_rep_count
158        {
159            thread.send_task(this.clone(), strong_cnt < 2 && thread_hndl_cnt > 1);
160        }
161    }
162}
163
164/// The internal structure which is stored in pair with the timer's FD. Contains necessary references
165/// and values.
166#[derive(Debug)]
167pub(crate) struct PeriodicTaskTicket
168{
169    /// A task name.
170    task_name: String,
171
172    sync_timer: PolledTimerFd<TimerFd>,
173
174    /// A time which was set to timer. Needed in case if timer was cancelled ny OS or suspended.
175    ptt: PeriodicTaskTime,
176
177    /// A [Weak] reference to the current [NewTaskTicket] which identifies to which thread the
178    /// task exec was assigned. The [Arc] of [NewTaskTicket] is clonned for each task exec round.
179    /// If is not `upgradable` the task is no logner assigned to thread.
180    weak_ticket: Weak<NewTaskTicket>,
181
182    /// A [Weak] reference to the [PeriodicTaskGuardInner]. If this reference is not `upgradable` 
183    /// then the task is no logner valid.
184    ptg: Weak<PeriodicTaskGuardInner>,
185}
186
187impl Ord for PeriodicTaskTicket 
188{
189    fn cmp(&self, other: &Self) -> cmp::Ordering 
190    {
191        return 
192            self
193                .sync_timer
194                .get_inner()
195                .as_timer_id()
196                .cmp(&other.sync_timer.get_inner().as_timer_id());
197    }
198}
199
200impl PartialOrd for PeriodicTaskTicket 
201{
202    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> 
203    {
204        return Some(self.cmp(other));
205    }
206}
207
208impl PartialEq for PeriodicTaskTicket 
209{
210    fn eq(&self, other: &Self) -> bool 
211    {
212        return 
213            self.task_name == other.task_name && 
214            self.sync_timer.get_inner().as_timer_id() == other.sync_timer.get_inner().as_timer_id();
215    }
216}
217
218impl Eq for PeriodicTaskTicket {}
219
220impl PeriodicTaskTicket
221{
222    fn new(task_name: String, ptt: PeriodicTaskTime, ptg: Arc<PeriodicTaskGuardInner>, poll: &TimerPoll) -> TimerResult<Self>
223    {
224        // create timer and add to poll
225        let sync_timer = 
226            TimerFd::new(task_name.clone().into(), TimerType::CLOCK_REALTIME, 
227                TimerFlags::TFD_CLOEXEC | TimerFlags::TFD_NONBLOCK)
228                .map_err(|e|
229                    map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot setup timer for task: '{}'", task_name)
230                )
231                .and_then(|timer| 
232                    poll.add(timer)
233                )?;
234
235        let ptt = 
236            Self
237            {
238                task_name: task_name,
239                sync_timer: sync_timer,
240                ptt: ptt,
241                weak_ticket: Weak::new(),
242                ptg: Arc::downgrade(&ptg),
243            };
244
245        ptt.set_timer()?;
246
247        return Ok(ptt);
248    }
249
250    #[inline]
251    fn set_timer(&self) -> TimerResult<()>
252    {
253        return self.get_timer_time().set_timer(self.sync_timer.get_inner());
254    }
255
256    #[inline]
257    fn unset_timer(&self) -> TimerResult<()>
258    {
259        return 
260            self
261                .sync_timer
262                .get_inner()
263                .get_timer()
264                .unset_time()
265                .map_err(|e|
266                    map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "unsetting timer '{}' returned error: {}", self.sync_timer, e)
267                );
268    }
269
270    fn update_task_time(&mut self, ptt_new: PeriodicTaskTime) -> TimerResult<()>
271    {
272        self.ptt = ptt_new;
273
274        return self.set_timer();
275    } 
276
277    fn get_task_guard(&self) -> TimerResult<Arc<PeriodicTaskGuardInner>>
278    {
279        return 
280            self
281                .ptg
282                .upgrade()
283                .ok_or_else(||
284                    map_timer_err!(TimerErrorType::ReferenceGone, "task: '{}' reference to timer has gone", 
285                        self.task_name)
286                );
287    }
288
289    #[inline]
290    fn get_timer_time(&self) -> &PeriodicTaskTime
291    {
292        return &self.ptt;
293    }
294}
295
296/// An instance which is returned by the [SyncPeriodicTasks::add] whuch guards the 
297/// task and allows to control its state. If task is no longer needed the instance
298/// can be dropped and it will be removed from the system.
299#[derive(Debug)]
300pub struct PeriodicTaskGuard
301{
302    /// A task name
303    task_name: String,
304
305    /// A [Arc] to [PeriodicTaskGuardInner] which contains timer and other things. 
306    /// Normally this is the base instance i.e reference and if it is dropped
307    /// it indicates to the task `executor` that the instance is no logner valid.
308    /// The autoremoval is perfomed via sending the `command` over the global
309    /// qeueu.
310    /// 
311    /// The [Option] is needed to `take` the instance.
312    guard: Option<Arc<PeriodicTaskGuardInner>>,
313
314    /// A clonned instance to the executor spawner which contains the reference 
315    /// to channel.
316    spt: Arc<SyncPeriodicTasksInner>
317}
318
319impl Drop for PeriodicTaskGuard
320{
321    fn drop(&mut self) 
322    {
323        let guard = self.guard.take().unwrap();
324
325        let _ = self.spt.send_global_cmd(GlobalTasks::RemoveTask(guard, None));
326
327        return;
328    }
329}
330
331impl PeriodicTaskGuard
332{
333    /// Requests the task rescheduling - changine the timer time or mode. 
334    /// The `task` is represented by the calling instance. 
335    /// 
336    /// This function blocks
337    /// the current thread for the maximum (in worst case) 5 seconds which is a 
338    /// timeout for feedback reception.
339    /// 
340    /// # Arguments
341    /// 
342    /// * `ptt` - a new time to be set to timer.
343    /// 
344    /// # Returns 
345    /// 
346    /// A [Result] as alias [TimerResult] is returned.
347    
348    /// In case if error is retuned, the operation should be considered as not completed
349    /// correctly and the executor instance is poisoned i.e a bug happened.
350    /// 
351    /// The common errors may be retuned:
352    /// 
353    /// * [TimerErrorType::MpscTimeout] - a feedback (with the result) reception timeout.
354    ///     Probably this is because the task executor is too busy.
355    /// 
356    /// * [TimerErrorType::TimerError] - with the timer error.
357    /// 
358    /// * [TimerErrorType::NotFound] - if instance was not found in the internal records. 
359    ///     Should not happen. If appears, then probably this is a bug.
360    pub 
361    fn reschedule_task(&self, ptt: PeriodicTaskTime) -> TimerResult<()>
362    {
363        let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
364
365        let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
366
367        self.spt.send_global_cmd(GlobalTasks::ReschedTask(weak_ptgi, ptt, Some(snd)))?;
368
369        return 
370            rcv
371                .recv_timeout(Duration::from_secs(10))
372                .map_err(|e|
373                    map_timer_err!(TimerErrorType::MpscTimeout, "reschedule_task(), task name: '{}', MPSC rcv timeout error: '{}'", 
374                        self.task_name, e)
375                )?;
376    }
377
378    /// Requests to suspent the current `task`.
379    /// 
380    /// This function blocks
381    /// the current thread for the maximum (in worst case) 5 seconds which is a 
382    /// timeout for feedback reception.
383    /// 
384    /// # Returns 
385    /// 
386    /// A [Result] as alias [TimerResult] is returned.
387    /// 
388    /// In case if error is retuned, the operation should be considered as not completed
389    /// correctly and the executor instance is poisoned i.e a bug happened.
390    /// 
391    /// The common errors may be retuned:
392    /// 
393    /// * [TimerErrorType::MpscTimeout] - a feedback (with the result) reception timeout.
394    ///     Probably this is because the task executor is too busy.
395    /// 
396    /// * [TimerErrorType::TimerError] - with the timer error.
397    pub 
398    fn suspend_task(&self) -> TimerResult<()>
399    {
400        let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
401
402        let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
403
404        self.spt.send_global_cmd(GlobalTasks::SuspendTask(weak_ptgi, Some(snd)))?;
405
406        return 
407            rcv
408                .recv_timeout(Duration::from_secs(10))
409                .map_err(|e|
410                    map_timer_err!(TimerErrorType::MpscTimeout, "suspend_task(), task name: '{}', MPSC rcv timeout error: '{}'", 
411                        self.task_name, e)
412                )?;
413    }
414
415    /// Requests to resume the task from the suspend state.
416    /// 
417    /// This function blocks
418    /// the current thread for the maximum (in worst case) 5 seconds which is a 
419    /// timeout for feedback reception.
420    /// 
421    /// # Returns 
422    /// 
423    /// A [Result] as alias [TimerResult] is returned.
424    /// 
425    /// In case if error is retuned, the operation should be considered as not completed
426    /// correctly and the executor instance is poisoned i.e a bug happened.
427    /// 
428    /// The common errors may be retuned:
429    /// 
430    /// * [TimerErrorType::MpscTimeout] - a feedback (with the result) reception timeout.
431    ///     Probably this is because the task executor is too busy.
432    /// 
433    /// * [TimerErrorType::TimerError] - with the timer error.
434    pub 
435    fn resume_task(&self) -> TimerResult<()>
436    {
437        let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
438
439        let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
440
441        self.spt.send_global_cmd(GlobalTasks::ResumeTask(weak_ptgi, Some(snd)))?;
442
443        return 
444            rcv
445                .recv_timeout(Duration::from_secs(10))
446                .map_err(|e|
447                    map_timer_err!(TimerErrorType::MpscTimeout, "resume_task(), task name: '{}', MPSC rcv timeout error: '{}'", 
448                        self.task_name, e)
449                )?;
450    }
451}
452
453/// Programs the taks's timer to specific time and mode. This instance is
454/// a wrapper around the [TimerExpMode] as this struct requers the generic to
455/// be specified which defines the type of the time.
456#[derive(Debug, Clone, Copy, PartialEq, Eq)]
457pub enum PeriodicTaskTime
458{
459    /// A provided time is absolute time in future.
460    Absolute(TimerExpMode<AbsoluteTime>),
461
462    /// A provided time is relative to current time.
463    Relative(TimerExpMode<RelativeTime>),
464}
465
466impl From<TimerExpMode<AbsoluteTime>> for PeriodicTaskTime
467{
468    fn from(value: TimerExpMode<AbsoluteTime>) -> Self 
469    {
470        return Self::Absolute(value);
471    }
472}
473
474impl From<TimerExpMode<RelativeTime>> for PeriodicTaskTime
475{
476    fn from(value: TimerExpMode<RelativeTime>) -> Self 
477    {
478        return Self::Relative(value);
479    }
480}
481
482impl fmt::Display for PeriodicTaskTime
483{
484    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
485    {
486        match self
487        {
488            Self::Absolute(t) => 
489                write!(f, "{}", t),
490            Self::Relative(t) => 
491                write!(f, "{}", t),
492        }
493    }
494}
495
496impl PeriodicTaskTime
497{
498    /// The timer is set to time up once and for the specific `absolute` time in future 
499    /// (to current realclock time).
500    #[inline]
501    pub 
502    fn exact_time(abs_time: AbsoluteTime) -> Self
503    {
504        return Self::Absolute(TimerExpMode::<AbsoluteTime>::new_oneshot(abs_time));
505    }
506
507    /// The timer is set to time up in the interval for the `relative` time.
508    #[inline]
509    pub 
510    fn interval(rel_time: RelativeTime) -> Self
511    {
512        return Self::Relative(TimerExpMode::<RelativeTime>::new_interval(rel_time));
513    }
514
515    /// The timer is set to time up in the interval for the `relative` time with the initial
516    /// delay.
517    /// 
518    /// # Arguments
519    /// 
520    /// * `start_del_time` - a [RelativeTime] of the initial delay.
521    /// 
522    /// * `rel_int_time` - a [RelativeTime] of the interval.
523    #[inline]
524    pub 
525    fn interval_with_start_delay(start_del_time: RelativeTime, rel_int_time: RelativeTime) -> Self
526    {
527        return Self::Relative(TimerExpMode::<RelativeTime>::new_interval_with_init_delay(start_del_time, rel_int_time));
528    }
529
530    /// Sets the `timer_fd` instance to the specific value stored in the current instance.
531    fn set_timer(&self, timer_fd: &TimerFd) -> TimerResult<()>
532    {
533        match *self
534        {
535            Self::Absolute(timer_exp_mode) => 
536                return 
537                    timer_fd
538                        .get_timer()
539                        .set_time(timer_exp_mode)
540                        .map_err(|e|
541                            map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot set time '{}' for timer: '{}'", timer_exp_mode, timer_fd )
542                        ),
543            Self::Relative(timer_exp_mode) => 
544                return 
545                    timer_fd
546                        .get_timer()
547                        .set_time(timer_exp_mode)
548                        .map_err(|e|
549                            map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot set time '{}' for timer: '{}'", timer_exp_mode, timer_fd )
550                        ),
551        }
552    }
553}
554
555
556/// A instane which contains the task's timer and a task.
557pub(crate) struct PeriodicTaskGuardInner
558{
559    task_name: String,
560
561    /// A task itself. At the moment it is mutexed in order to provide
562    /// the mutability, because the current instance is wrapped into [Arc].
563    /// But, the task is never executed from different threads and mutex
564    /// should be replaced with something that is [Send] and can provide
565    /// mutable reference.
566    task: Mutex<PeriodicTaskHndl>,
567}
568
569impl fmt::Debug for PeriodicTaskGuardInner
570{
571    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
572    {
573        f.debug_struct("PeriodicTaskGuardInner").field("task_name", &self.task_name).finish()
574    }
575}
576
577impl fmt::Display for PeriodicTaskGuardInner
578{
579    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
580    {
581        write!(f, "task name: '{}'", self.task_name)
582    }
583}
584
585impl PeriodicTaskGuardInner
586{
587    fn new(task_name: String, task_inst: PeriodicTaskHndl) -> TimerResult<Self>
588    {
589        return Ok(Self {  task_name: task_name, task: Mutex::new(task_inst) });
590    }
591
592
593}
594
595/// Internal structure for every worker thread.
596struct ThreadWorker
597{
598    /// A thread name, not task name.
599    thread_name: String,
600
601    /// An [Injector] queue for the global tasks received from other threads.
602    global_task_injector: Arc<Injector<GlobalTasks>>,
603
604    /// An [Injector] queue for local (specific to current thread) tasks.
605    local_thread_inj: Arc<Injector<ThreadTask>>,
606
607    /// A flag which is used to control the thread's loop. Is set to `false`
608    /// when thread should exit.
609    thread_run_flag: Arc<AtomicBool>,
610
611    /// A shared instance of the [SyncPeriodicTasksInner] which holds all information
612    /// about tasks and FD polling. The thread which aquired the mutex first will
613    /// process the `global_task_injector` queue and poll the timers for the events which
614    /// will be shared over with other threads.
615    spti: Arc<Mutex<SharedPeriodicTasks>>,
616
617    /// A last thread to which the task was assigned.
618    thread_last_id: usize,
619
620    /// A poll interrupt signal sender.
621    poll_int: PollInterrupt,
622
623    /// A error journal, if [Option::None] then no journal available
624    tx_err: Option<Sender<TimerError>>,
625}
626
627impl ThreadWorker
628{
629    fn new(
630        thread_name: String, 
631        global_task_injector: Arc<Injector<GlobalTasks>>, 
632        spti: Arc<Mutex<SharedPeriodicTasks>>, 
633        poll_int: PollInterrupt, 
634        tx_err: Option<Sender<TimerError>>
635    ) -> TimerResult<ThreadHandler>
636    {
637        let local_thread_inj = Arc::new(Injector::<ThreadTask>::new());
638        let thread_run_flag = Arc::new(AtomicBool::new(true));
639        let thread_run_flag_weak = Arc::downgrade(&thread_run_flag);
640
641        let worker = 
642            ThreadWorker
643            {
644                thread_name: 
645                    thread_name.clone(),
646                global_task_injector: 
647                    global_task_injector,
648                local_thread_inj:
649                    local_thread_inj.clone(),
650                thread_run_flag: 
651                    thread_run_flag,
652                spti:
653                    spti,
654                thread_last_id:
655                    0,
656                poll_int:
657                    poll_int,
658                tx_err: 
659                    tx_err
660            };
661
662        let thread_hndl =
663            std::thread::Builder::new()
664                .name(thread_name)
665                .spawn(|| worker.worker())
666                .map_err(|e|
667                    map_timer_err!(TimerErrorType::SpawnError(e.kind()), "{}", e)
668                )?;
669            
670
671        return Ok( ThreadHandler::new(thread_hndl, local_thread_inj, thread_run_flag_weak) );
672    }
673
674
675    fn worker(mut self) -> TimerResult<()>
676    {
677        // force to park to allow the main thread to initialize everything
678        std::thread::park();
679
680        while self.thread_run_flag.load(Ordering::Acquire) == true
681        {
682            // check local queue for the task or token
683            while let Steal::Success(task) = self.local_thread_inj.steal()
684            {
685                match task
686                {
687                    ThreadTask::TaskExec(task_exec) =>
688                    {
689                        let Some(ptgi) = task_exec.ptgi.upgrade()
690                            else
691                            {   
692                                // task was removed while was in queue.
693                                continue;
694                            };
695
696                        // call task
697                        match ptgi.task.lock().unwrap().exec()
698                        {
699                            PeriodicTaskResult::Ok => 
700                                {},
701                            PeriodicTaskResult::CancelTask => 
702                            {
703                                self
704                                    .global_task_injector
705                                    .push(GlobalTasks::SuspendTask(task_exec.ptgi.clone(), None));
706
707                                let _ = self.poll_int.aquire().map(|v| v.interrupt_drop());
708                            },
709                            PeriodicTaskResult::TaskReSchedule(ptt) => 
710                            {
711                                self
712                                    .global_task_injector
713                                    .push(GlobalTasks::ReschedTask(task_exec.ptgi.clone(), ptt, None));
714
715                                let _ = self.poll_int.aquire().map(|v| v.interrupt_drop());
716                            }
717                        }
718
719                        drop(task_exec);
720                    }
721                }
722            }
723
724            let spti_lock_res =  self.spti.try_lock();
725
726            if let Ok(mut task_token) = spti_lock_res
727            {
728                let thread_hndl_cnt = task_token.thread_pool.get().unwrap().len();
729
730                // check global task queue
731                while let Steal::Success(task) = self.global_task_injector.steal()
732                {
733                    match task
734                    {
735                        GlobalTasks::AddTask(task_name, ptt, ptg, opt_err_ret) => 
736                        {
737                            if task_token.contains_task(&task_name) == true
738                            {
739                                if let Some(err_ret) = opt_err_ret
740                                {
741                                    let err_msg = 
742                                        map_timer_err!(TimerErrorType::Duplicate, 
743                                            "thread: '{}', task: '{}' already exists", self.thread_name, task_name);
744
745                                    let _ = err_ret.send(Err(err_msg));
746                                }
747
748                                continue;
749                            }
750
751                            let period_task_ticket = 
752                                PeriodicTaskTicket::new(task_name.clone(), ptt, ptg, &task_token.timers_poll);
753
754                            if let Err(e) = period_task_ticket 
755                            {
756                                if let Some(err_ret) = opt_err_ret
757                                {
758                                    let _ = err_ret.send(Err(e));
759                                }
760
761                                continue;
762                            }
763
764                            let period_task_ticket = period_task_ticket.unwrap();
765                            
766                            task_token.insert_task(period_task_ticket);
767                            
768                            if let Some(err_ret) = opt_err_ret
769                            {
770                                let _ = err_ret.send(Ok(()));
771                            }
772                        },
773                        GlobalTasks::RemoveTask(ptg_arc, opt_err_ret) => 
774                        {
775                            let res = task_token.remove_task(&ptg_arc.task_name);
776
777                            if let Err(e) = res
778                            {
779                                if let Some(err_ret) = opt_err_ret
780                                {
781                                    let err_msg = 
782                                        map_timer_err!(e.get_error_type(), 
783                                            "thread: '{}', {}", self.thread_name, e.get_error_msg());
784
785                                    let _ = err_ret.send(Err(err_msg));
786                                }
787
788                                continue;
789                            }
790
791                            let ptt_ref = res.unwrap();
792
793                            //let ptt = ptt_ref.into_inner();
794
795                            // stop timer
796                            if let Err(e) = ptt_ref.unset_timer()
797                            {
798                                if let Some(err_ret) = opt_err_ret.as_ref()
799                                {
800                                    let _ = err_ret.send(Err(e));
801                                }
802
803                                continue;
804                            }
805
806                            drop(ptt_ref);
807
808                            /*if let Some(err_ret) = opt_err_ret
809                            {
810                                let _ = err_ret.send(Ok(()));
811                            }
812
813                            if let Some(ptt_ref) = task_token.tasks.remove(&ptg_arc.task_name)
814                            {
815                                
816                            }
817                            else
818                            {
819                                if let Some(err_ret) = opt_err_ret
820                                {
821                                    let err_msg = 
822                                        map_timer_err!(TimerErrorType::NotFound, 
823                                            "thread: '{}', task: '{}' was not found", self.thread_name, ptg_arc.task_name);
824
825                                    let _ = err_ret.send(Err(err_msg));
826                                }
827                            }*/
828
829                            drop(ptg_arc);
830                        },
831                        GlobalTasks::ReschedTask( ptg_weak, ptt, opt_err_ret ) =>
832                        {
833                            let Some(ptg_arc) = ptg_weak.upgrade()
834                                else
835                                {   
836                                    // task was removed while was in queue.
837                                    continue;
838                                };
839
840                            let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
841
842                            let res = 
843                                match res_task
844                                {
845                                    Ok(task) => 
846                                    {
847                                        // stop timer
848                                        let _ = task.unset_timer();
849
850                                        // update timer value and set timer
851                                        let res = task.update_task_time(ptt);
852
853                                        if let Err(e) = res
854                                        {
855                                            if let Some(err_ret) = opt_err_ret.as_ref()
856                                            {
857                                                let _ = err_ret.send(Err(e));
858                                            }
859
860                                            continue;
861                                        }
862
863                                        Ok(())
864                                    },
865                                    Err(err) => 
866                                    {
867                                        Err(err)
868                                    }
869                                };
870
871                            if let Some(err_ret) = opt_err_ret
872                            {
873                                let _ = err_ret.send(res);
874                            }
875                        },
876                        GlobalTasks::SuspendTask(ptg_weak, opt_err_ret) =>
877                        {
878                            let Some(ptg_arc) = ptg_weak.upgrade()
879                                else
880                                {   
881                                    // task was removed while was in queue.
882                                    continue;
883                                };
884
885                            /*let res_task = 
886                                task_token
887                                    .tasks
888                                    .get(&ptg_arc.task_name)
889                                    .ok_or_else(||
890                                        map_timer_err!(TimerErrorType::NotFound, 
891                                            "thread: '{}', task: '{}' was not found", self.thread_name, ptg_arc.task_name)
892                                    );*/
893
894                            let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
895
896                            if let Err(e) = res_task
897                            {
898                                if let Some(err_ret) = opt_err_ret.as_ref()
899                                {
900                                    let _ = err_ret.send(Err(e));
901                                }
902
903                                continue;
904                            }
905
906                            let task = res_task.unwrap();
907
908                            let res = task.unset_timer();
909
910                            if let Some(err_ret) = opt_err_ret
911                            {
912                                let _ = err_ret.send(res);
913                            }
914                        },
915                        GlobalTasks::ResumeTask(ptg_weak, opt_err_ret) =>
916                        {
917                            let Some(ptg_arc) = ptg_weak.upgrade()
918                                else
919                                {   
920                                    // task was removed while was in queue.
921                                    continue;
922                                };
923
924                            /*let res_task = 
925                                task_token
926                                    .tasks
927                                    .get(&ptg_arc.task_name)
928                                    .ok_or_else(||
929                                        map_timer_err!(TimerErrorType::NotFound, 
930                                            "thread: '{}', timer with FD: '{}' was not found", self.thread_name, ptg_arc.task_name)
931                                    );*/
932
933                            let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
934
935                            if let Err(e) = res_task
936                            {
937                                if let Some(err_ret) = opt_err_ret.as_ref()
938                                {
939                                    let _ = err_ret.send(Err(e));
940                                }
941
942                                continue;
943                            }
944
945                            let res = res_task.unwrap().set_timer();
946
947                            if let Some(err_ret) = opt_err_ret.as_ref()
948                            {
949                                let _ = err_ret.send(res);
950                            }
951                        }
952                    }
953                }
954
955                // poll
956                let Some(res) = 
957                    task_token
958                        .timers_poll
959                        .poll(Some(5000))?
960                else
961                    {
962                        continue;
963                    };
964
965                for event in res
966                {
967                    match event
968                    {
969                        PollEventType::TimerRes(timer_fd, timer_res) => 
970                        {
971                            let task_by_id = 
972                                task_token
973                                    .get_task_by_timer_id(timer_fd)
974                                    .map_err(|e|
975                                        map_timer_err!(e.get_error_type(), "thread: '{}', {}", self.thread_name, e.get_error_msg())
976                                    )
977                                    .map(|task|
978                                        (task.ptg.clone(), task.ptt.clone(), task.weak_ticket.upgrade(), task.task_name.clone())
979                                    );
980
981                            let Ok((ptg, ptt, ticket_arc_opt, task_name)) = task_by_id
982                                else
983                                {
984                                    if let Some(tx_err) = self.tx_err.as_ref()
985                                    {
986                                        let _ = tx_err.send(task_by_id.err().unwrap());
987                                    }
988
989                                    continue;
990                                };
991                                
992
993                            // check that PeriodicTaskGuard is ok
994                            let Some(ptg_arc) = ptg.upgrade()
995                                else
996                                {
997                                    //let task_name = task.task_name.clone();
998
999                                    // if Arc cannot be upgraded, then remove the task as removed
1000                                    //task_token.tasks.remove(&timer_fd);
1001                                    if let Err(e) = task_token.remove_task(&task_name)
1002                                    {
1003                                        if let Some(tx_err) = self.tx_err.as_ref()
1004                                        {
1005                                            let _ = tx_err.send(e);
1006                                        }
1007                                    }
1008
1009                                    // continue event handling
1010                                    continue;
1011                                };
1012
1013                            // check timer result
1014                            let overflow_cnt: u64 = 
1015                                match timer_res
1016                                {
1017                                    TimerReadRes::Ok(overfl) => 
1018                                    {
1019                                        overfl
1020                                    },
1021                                    TimerReadRes::Cancelled => 
1022                                    {
1023                                        // the system time was modified, resched instance
1024
1025                                        self
1026                                            .global_task_injector
1027                                            .push(
1028                                                GlobalTasks::ReschedTask(ptg.clone(), ptt.clone(), None)
1029                                            );
1030
1031                                        // timer was reset continue
1032                                        continue;
1033                                    },
1034                                    TimerReadRes::WouldBlock => 
1035                                    {
1036                                        // should not happen, silently ignore or panic?
1037                                        panic!("assertion trap: timer retuned WouldBlock, {}", ptg_arc);
1038                                    }
1039                                };
1040
1041                            // check if ticket presents, if not create
1042                           // let ticket_arc_opt = task.weak_ticket.upgrade();
1043
1044
1045                            let ticket = 
1046                                match ticket_arc_opt
1047                                {
1048                                    Some(ticket) => 
1049                                        ticket,
1050                                    None => 
1051                                    {    
1052                                        let task_thread = 
1053                                            {
1054                                                /*let thread_local_hnd = 
1055                                                    task_token
1056                                                        .thread_pool
1057                                                        .get()
1058                                                        .unwrap()[self.thread_last_id]
1059                                                        .get_thread_handler();*/
1060
1061                                               
1062
1063                                                self.thread_last_id = (self.thread_last_id + 1) % thread_hndl_cnt;
1064
1065                                                //thread_local_hnd
1066                                                task_token.clone_thread_handler(self.thread_last_id)
1067                                            }; 
1068
1069                                        let ticket =
1070                                                Arc::new(NewTaskTicket::new(task_thread, ptg.clone()));
1071
1072                                        let task = 
1073                                            task_token
1074                                                .get_task_by_timer_id(timer_fd)
1075                                                .map_err(|e|
1076                                                    map_timer_err!(e.get_error_type(), "thread: '{}', {}", self.thread_name, e.get_error_msg())
1077                                                )?;
1078                                                
1079                                        task.weak_ticket = Arc::downgrade(&ticket);
1080
1081                                        ticket
1082                                    }
1083                                };
1084                            
1085                            NewTaskTicket::send_task(ticket, overflow_cnt, thread_hndl_cnt);
1086                        },
1087                        _ => 
1088                        {
1089                            // ignore
1090                        },
1091                    }
1092                } // for
1093            }
1094            else if let Err(TryLockError::WouldBlock) = spti_lock_res
1095            {
1096                if self.thread_run_flag.load(Ordering::Acquire) == false
1097                {
1098                    return Ok(());
1099                }
1100
1101                if self.local_thread_inj.is_empty() == true
1102                {
1103                    std::thread::park_timeout(Duration::from_secs(2));
1104                }
1105            } 
1106            
1107        } // while
1108
1109        return Ok(());
1110    }
1111}
1112
1113
1114/// A common instance which contains the specific thread handler and
1115/// a [Weak] reference to the loop controlling flag.
1116#[derive(Debug)]
1117struct ThreadHandler
1118{
1119    /// A thread join handler
1120    hndl: JoinHandle<TimerResult<()>>,
1121
1122    /// A local task injector.
1123    task_injector: Arc<Injector<ThreadTask>>,
1124
1125    /// A flag which controls the thread loop. Initialized as `true`.
1126    thread_flag: Weak<AtomicBool>,
1127}
1128
1129impl ThreadHandler
1130{
1131    fn new(hndl: JoinHandle<TimerResult<()>>, task_injector: Arc<Injector<ThreadTask>>, thread_flag: Weak<AtomicBool>) -> Self
1132    {            
1133        return 
1134            Self
1135            {
1136                hndl,
1137                task_injector,
1138                thread_flag: thread_flag
1139            };
1140    }
1141
1142    fn stop(&self)
1143    {
1144        if let Some(v) = self.thread_flag.upgrade()
1145        {
1146            v.store(false, Ordering::Release);
1147        }
1148    }
1149
1150    fn unpark(&self) 
1151    {
1152        self.hndl.thread().unpark();
1153    }
1154
1155    fn send_task(&self, task: Arc<NewTaskTicket>, unpark: bool)
1156    {
1157        self.task_injector.push(ThreadTask::TaskExec(task));
1158
1159        if unpark == true
1160        {
1161            self.hndl.thread().unpark();
1162        }
1163
1164        return;
1165    }
1166
1167    fn clean_local_queue(&self)
1168    {
1169        while let Steal::Success(_) = self.task_injector.steal() {}
1170
1171        return;
1172    }
1173}
1174
1175/// A instance which contains all information about the threads allocated for the
1176/// task execution, tasks and timer polling. An instance is shared with all threads.
1177#[derive(Debug)]
1178pub struct SharedPeriodicTasks
1179{
1180    /// A list of the threads. A [OnceCell] is used to initialize the field once.
1181    thread_pool: OnceCell<Arc<Vec<Arc<ThreadHandler>>>>,
1182
1183    /// A [HashMap] of the regestered tasks. Each task is mapped to its timer FD.
1184    tasks_by_timer_fd: HashMap<TimerId, PeriodicTaskTicket>,
1185
1186    /// A task name to [TimerId] map.
1187    tasks_name_to_timer_id: HashMap<String, TimerId>,
1188
1189    /// A timer event poll. 
1190    timers_poll: TimerPoll,
1191}
1192
1193
1194impl SharedPeriodicTasks
1195{ 
1196    fn new() -> TimerResult<Self>
1197    {
1198
1199        return Ok( 
1200            Self 
1201            { 
1202                thread_pool: 
1203                    OnceCell::default(),
1204                tasks_by_timer_fd: 
1205                    HashMap::new(),
1206                tasks_name_to_timer_id: 
1207                    HashMap::new(),
1208                timers_poll: 
1209                    TimerPoll::new()?
1210            }
1211        );
1212    }
1213
1214    /// Returns the mutable reference to [PeriodicTaskTicket] by the [TimerId].
1215    fn get_task_by_timer_id(&mut self, timer_fd: TimerId) -> TimerResult<&mut PeriodicTaskTicket>
1216    {
1217        return 
1218            self
1219                .tasks_by_timer_fd
1220                .get_mut(&timer_fd)
1221                .ok_or_else(||
1222                    map_timer_err!(TimerErrorType::NotFound, "task fd: '{}' was found but task was not found", 
1223                        timer_fd)
1224                );
1225    }
1226
1227    /// Returns the mutable reference to [PeriodicTaskTicket] by the task name.
1228    /// 
1229    /// This function is slower than [Self::get_task_by_timer_id] because it performs
1230    /// look in two tables.
1231    fn get_task_by_name(&mut self, task_name: &str) -> TimerResult<&mut PeriodicTaskTicket>
1232    {
1233        let res = 
1234            self
1235                .tasks_name_to_timer_id
1236                .get(task_name)
1237                .ok_or_else(||
1238                    map_timer_err!(TimerErrorType::NotFound, "task: '{}' was not found", task_name)
1239                )
1240                .map(|v|
1241                    self
1242                        .tasks_by_timer_fd
1243                        .get_mut(v)
1244                        .ok_or_else(||
1245                            map_timer_err!(TimerErrorType::NotFound, "task: '{}' fd: '{}' was found but task was not found", 
1246                                task_name, v)
1247                        )
1248                )??;
1249
1250        return Ok(res);
1251    }
1252
1253    /// Clones the [ThreadHandler] by the `thread_last_id`.
1254    /// 
1255    /// # Panics
1256    /// 
1257    /// Will panic if `thread_last_id` is out of range.
1258    fn clone_thread_handler(&self, thread_last_id: usize) -> Arc<ThreadHandler>
1259    {
1260        let thread_local_hnd = 
1261            self
1262                .thread_pool
1263                .get()
1264                .unwrap()[thread_last_id]
1265                .clone();
1266
1267        return thread_local_hnd;
1268    }
1269
1270    /// Removes the task by `task_name`.
1271    fn remove_task(&mut self, task_name: &str) -> TimerResult<PeriodicTaskTicket>
1272    {
1273        let Some(task_timer_fd) = self.tasks_name_to_timer_id.remove(task_name)
1274            else
1275            {
1276                timer_err!(TimerErrorType::NotFound, "task: '{}' was not found", task_name);
1277            };
1278
1279        let Some(ptt) = self.tasks_by_timer_fd.remove(&task_timer_fd)
1280            else
1281            {
1282                timer_err!(TimerErrorType::NotFound, "task: '{}' fd: '{}' was found but task was not found", 
1283                    task_name, task_timer_fd);
1284            };
1285
1286        return Ok(ptt);
1287    }
1288
1289    /// Returns `true` if task with name `task_name` exists.
1290    fn contains_task(&self, task_name: &str) -> bool
1291    {
1292        return self.tasks_name_to_timer_id.contains_key(task_name);
1293    }
1294
1295    /// Inserts the `task` into scheduler list.
1296    fn insert_task(&mut self, period_task_ticket: PeriodicTaskTicket)
1297    {
1298        let timer_id = period_task_ticket.sync_timer.get_inner().as_timer_id();
1299
1300        self.tasks_name_to_timer_id.insert(period_task_ticket.task_name.clone(), timer_id);
1301        self.tasks_by_timer_fd.insert(timer_id, period_task_ticket);
1302
1303        return;
1304    }
1305}
1306
1307/// An `inner` of the [SyncPeriodicTasks].
1308#[derive(Debug)]
1309pub struct SyncPeriodicTasksInner
1310{
1311    /// A [Weak] reference to the [EventFd] of the timer `poll` which is used to interrupt the polling.
1312    poll_int: PollInterrupt,
1313
1314    /// A `global` FIFO which is used to send commands to the task executor.
1315    task_injector: Arc<Injector<GlobalTasks>>,
1316
1317    /// An error journal
1318    error_journal: Option<Mutex<Receiver<TimerError>>>,
1319}
1320
1321impl SyncPeriodicTasksInner
1322{
1323    fn send_global_cmd(&self, glob: GlobalTasks) -> TimerResult<()>
1324    {
1325        let poll_int = 
1326            self.poll_int.aquire()?;
1327                
1328        self.task_injector.push(glob);
1329
1330        poll_int.interrupt_drop()?;
1331
1332        return Ok(());
1333    }
1334
1335    fn clear_global_queue(&self)
1336    {
1337        while let Steal::Success(_) = self.task_injector.steal() {}
1338
1339        return;
1340    }
1341}
1342
1343/// A wrapper for the task which is described in closure.
1344struct SyncCallableOper
1345{
1346    op: Box<dyn FnMut() -> PeriodicTaskResult>,
1347}
1348
1349unsafe impl Send for SyncCallableOper {}
1350
1351impl PeriodicTask for SyncCallableOper
1352{
1353    fn exec(&mut self) -> PeriodicTaskResult 
1354    {
1355        return (self.op)();
1356    }
1357}
1358
1359/// A `main` instance which spawns the task executor.
1360/// 
1361/// ```ignore
1362/// let spt = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1363/// let task1 = TaskStruct1::new(2, send);
1364/// let task1_ptt = PeriodicTaskTime::Relative(TimerExpMode::<RelativeTime>::new_interval(RelativeTime::new_time(1, 0)));
1365/// let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1366/// ```
1367/// 
1368/// This instance should be kept somewhere and dropped only if the task executor WITH
1369/// all spawned tasks are no longer needed.
1370#[derive(Debug, Clone)]
1371pub struct SyncPeriodicTasks
1372{
1373    threads: Option<Arc<Vec<Arc<ThreadHandler>>>>,
1374
1375    inner: Arc<SyncPeriodicTasksInner>,
1376}
1377
1378impl Drop for SyncPeriodicTasks
1379{
1380    fn drop(&mut self) 
1381    {
1382        self.inner.clear_global_queue();
1383
1384        let mut threads = self.threads.take().unwrap();
1385
1386        // stop all threads
1387        for thread in threads.iter()
1388        {
1389            thread.stop();
1390            thread.unpark();
1391        }
1392
1393        // interrupt poll
1394        let _ = self.inner.poll_int.aquire().map(|v| v.interrupt_drop());
1395
1396        for _ in 0..5
1397        {
1398            let threads_unwr = 
1399                match Arc::try_unwrap(threads)
1400                {
1401                    Ok(r) => r,
1402                    Err(e) =>
1403                    {
1404                        threads = e;
1405
1406                        std::thread::sleep(Duration::from_millis(500));
1407
1408                        continue;
1409
1410                    }
1411                };
1412
1413            for thread in threads_unwr
1414            {
1415                thread.clean_local_queue();
1416
1417                let Some(thread) = Arc::into_inner(thread)
1418                else
1419                {
1420                    panic!("assertion trap: ~SyncPeriodicTasks, a reference to ThreadHandler left somewhere");
1421                };
1422
1423                let _ = thread.hndl.join();
1424            }
1425
1426            break;
1427        }
1428    }
1429}
1430
1431
1432impl SyncPeriodicTasks
1433{
1434
1435    
1436    /// Creates new instance. An amount of threads allocated for the task executor
1437    /// should be specified. All threads will be started immidiatly. For small
1438    /// tasks one thread will be enough. For a large amount of tasks, especially it
1439    /// tasks are waken up oftenly then at least two threads should be allocated.
1440    /// 
1441    /// # Arguments
1442    /// 
1443    /// * `threads_cnt` - a [NonZeroUsize] amount of threads.
1444    /// 
1445    /// # Returns
1446    /// 
1447    /// The [Result] is returned as alias [TimerResult].
1448    pub 
1449    fn new(threads_cnt: NonZeroUsize, error_report: bool) -> TimerResult<Self>
1450    {
1451        let spti = SharedPeriodicTasks::new()?;
1452        let poll_int = spti.timers_poll.get_poll_interruptor();
1453
1454        // wrap into the mutex because this instance is shared 
1455        let spti = Arc::new(Mutex::new(spti));
1456
1457        let task_injector = Arc::new(Injector::<GlobalTasks>::new());
1458
1459        let mut thread_hndls: Vec<Arc<ThreadHandler>> = Vec::with_capacity(threads_cnt.get());
1460
1461        // error report
1462        let err_journal =
1463            if error_report == true
1464            {
1465                Some(mpsc::channel::<TimerError>())
1466            }
1467            else
1468            {
1469                None
1470            };
1471
1472        // spawn threads, all spawn threads will be parked by default.
1473        for i in 0..threads_cnt.get()
1474        {
1475            let handler = 
1476                ThreadWorker::new(
1477                    format!("timer_exec/{}s", i), 
1478                    task_injector.clone(), 
1479                    spti.clone(), 
1480                    poll_int.clone(), 
1481                    err_journal.as_ref().map(|c| c.0.clone())
1482                )?;
1483
1484            thread_hndls.push(Arc::new(handler));
1485        }
1486
1487        let thread_hndls = Arc::new(thread_hndls);
1488
1489        // Lock the instance to initialze the `thread_pool` variable.
1490        let spti_lock = spti.lock().unwrap();
1491        spti_lock.thread_pool.get_or_init(|| thread_hndls.clone());
1492
1493        // get a random thread to unpark it and start the process
1494        let thread = 
1495            spti_lock
1496                .thread_pool
1497                .get()
1498                .unwrap()
1499                .get(random_range(0..threads_cnt.get()))
1500                .unwrap()
1501                .clone();
1502
1503        drop(spti_lock);
1504
1505        // unpark thread
1506        thread.unpark();
1507
1508        let inner = 
1509            SyncPeriodicTasksInner
1510            {
1511                poll_int: 
1512                    poll_int,
1513                task_injector: 
1514                    task_injector,
1515                error_journal: 
1516                    err_journal.map(|j| Mutex::new(j.1))
1517            };
1518
1519        return Ok( 
1520            Self 
1521            { 
1522                threads: Some(thread_hndls),
1523                inner: Arc::new(inner),
1524            }
1525        );
1526    }
1527
1528    /// Adds and spawns the task. The task is defined with generic `T` which is any 
1529    /// datatype which implements [PeriodicTask].
1530    /// 
1531    /// # Arguments
1532    /// 
1533    /// * `task_name` - a task name. used only for identification purposes in debug messages.
1534    /// 
1535    /// * `task` - a task which should be executed. It should implenet [PeriodicTask].
1536    /// 
1537    /// * `task_time` - [PeriodicTaskTime] a time when the task must be spawned.
1538    /// 
1539    /// # Returns
1540    /// 
1541    /// The [Result] is returned as alias [TimerResult].
1542    /// 
1543    /// # Example
1544    /// 
1545    /// ```ignore
1546    /// #[derive(Debug)]
1547    /// struct TaskStruct1
1548    /// {
1549    ///     a1: u64,
1550    ///     s: Sender<u64>,
1551    /// }
1552    /// 
1553    /// impl TaskStruct1
1554    /// {
1555    ///     fn new(a1: u64, s: Sender<u64>) -> Self
1556    ///     {
1557    ///         return Self{ a1: a1, s };
1558    ///     }
1559    /// }
1560    /// 
1561    /// impl PeriodicTask for TaskStruct1
1562    /// {
1563    ///     fn exec(&mut self) -> PeriodicTaskResult
1564    ///     {
1565    ///         println!("taskstruct1 val: {}", self.a1);
1566    ///         let _ = self.s.send(self.a1);
1567    ///         return PeriodicTaskResult::Ok;
1568    ///     }
1569    /// }
1570    /// 
1571    /// fn main()
1572    /// {
1573    ///     let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1574    ///     
1575    ///     let (send, recv) = mpsc::channel::<u64>();
1576    /// 
1577    ///     let task1 = TaskStruct1::new(2, send);
1578    /// 
1579    ///     let task1_ptt = PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1580    /// 
1581    ///     let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1582    /// 
1583    ///     println!("added");
1584    /// 
1585    ///     let val = recv.recv();
1586    /// 
1587    ///     println!("{:?}", val);
1588    /// 
1589    ///     drop(task1_guard);
1590    /// }
1591    /// ```
1592    pub 
1593    fn add<T>(&self, task_name: impl Into<String>, task: T, task_time: PeriodicTaskTime) -> TimerResult<PeriodicTaskGuard>
1594    where T: PeriodicTask
1595    {
1596        let task_int: PeriodicTaskHndl = Box::new(task);
1597
1598        let task_name_str: String = task_name.into();
1599
1600        let period_task_guard = 
1601            Arc::new(PeriodicTaskGuardInner::new(task_name_str.clone(), task_int)?);
1602
1603
1604       /* let period_task_ticket = 
1605            PeriodicTaskTicket::new(task_name_str.clone(), task_time, Arc::downgrade(&period_task_guard));*/
1606
1607        let (mpsc_send, mpsc_recv) = mpsc::channel();
1608
1609        self.inner.send_global_cmd(GlobalTasks::AddTask(task_name_str.clone(), task_time, period_task_guard.clone(), Some(mpsc_send)) )?;
1610
1611        let _ = 
1612            mpsc_recv
1613                .recv()
1614                .map_err(|e|
1615                    map_timer_err!(TimerErrorType::ExternalError, "mpsc error: {}", e)
1616                )??;
1617
1618        let ret = 
1619            PeriodicTaskGuard
1620            {
1621                task_name: task_name_str,
1622                guard: Some(period_task_guard),
1623                spt: self.inner.clone()
1624            };
1625
1626        return Ok(ret);
1627    }
1628
1629    /// Adds and spawns the task in form of a closure.
1630    /// 
1631    /// # Arguments
1632    /// 
1633    /// * `task_name` - a task name. used only for identification purposes in debug messages.
1634    /// 
1635    /// * `task_time` - [PeriodicTaskTime] a time when the task must be spawned.
1636    /// 
1637    /// * `clo` - a function to exec on timeout. The function must return [PeriodicTaskResult].
1638    /// 
1639    /// # Returns
1640    /// 
1641    /// The [Result] is returned as alias [TimerResult].
1642    /// 
1643    /// # Example
1644    /// 
1645    /// ```ignore
1646    /// 
1647    /// let task1_ptt = PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1648    /// 
1649    /// let (send, recv) = mpsc::channel::<u64>();
1650    /// 
1651    /// let task1_guard = 
1652    ///     s.add_closure("task2", task1_ptt, 
1653    ///         move || 
1654    ///         { 
1655    ///             println!("test output"); 
1656    ///             send.send(1).unwrap();
1657    /// 
1658    ///             return PeriodicTaskResult::Ok;
1659    ///         }
1660    ///     ).unwrap();
1661    /// 
1662    /// let val = recv.recv();
1663    /// 
1664    /// println!("{:?}", val);
1665    /// ```
1666    pub 
1667    fn add_closure<F>(&self, task_name: impl Into<String>, task_time: PeriodicTaskTime, clo: F) -> TimerResult<PeriodicTaskGuard>
1668    where F: 'static + FnMut() -> PeriodicTaskResult + Send
1669    {
1670        let closure_task = SyncCallableOper{ op: Box::new(clo) };
1671
1672        return self.add(task_name, closure_task, task_time);
1673    }
1674    /// Checks if any thread have crashed and no longer works.
1675    /// 
1676    /// # Returns
1677    /// 
1678    /// A [Option] is retuerned with the inner data:
1679    /// 
1680    /// * [Option::Some] with the thread name [String] that have quit.
1681    /// 
1682    /// * [Option::None] indicating that everthing is fine.
1683    pub 
1684    fn check_thread_status(&self) -> Option<String>
1685    {
1686        for thread in self.threads.as_ref().unwrap().iter()
1687        {
1688            if let None = thread.thread_flag.upgrade()
1689            {
1690                return Some(thread.hndl.thread().name().unwrap().to_string());
1691            }
1692        }
1693
1694        return None;
1695    }
1696
1697    /// Reads the error from the journal. This function does not block current thread
1698    /// when receiving, but it will wait for the [Mutex] release.
1699    /// 
1700    /// # Returns
1701    /// 
1702    /// * If any message is available the [Option::Some] with [TimerError] is returned.
1703    /// 
1704    /// * If instance is not available or no errors, the [Option::None] is returned.
1705    pub 
1706    fn read_error(&self) -> Option<TimerError>
1707    {
1708        let Some(rx) = self.inner.error_journal.as_ref()
1709            else { return None };
1710
1711        let Ok(err) = 
1712            rx
1713                .lock()
1714                .unwrap_or_else(|e| e.into_inner())
1715                .recv_timeout(Duration::from_secs(0))
1716        else { return None };
1717
1718        return Some(err);
1719    }
1720}
1721
1722#[cfg(test)]
1723mod tests
1724{
1725    use core::fmt;
1726    use std::{sync::mpsc::{self, RecvTimeoutError, Sender}, time::{Duration, Instant}};
1727
1728    use crate::{periodic_task::sync_tasks::{PeriodicTask, PeriodicTaskResult, PeriodicTaskTime, SyncPeriodicTasks}, AbsoluteTime, RelativeTime};
1729
1730    #[derive(Debug)]
1731    struct TaskStruct1
1732    {
1733        a1: u64,
1734        s: Sender<u64>,
1735    }
1736
1737    impl TaskStruct1
1738    {
1739        fn new(a1: u64, s: Sender<u64>) -> Self
1740        {
1741            return Self{ a1: a1, s };
1742        }
1743    }
1744
1745    impl PeriodicTask for TaskStruct1
1746    {
1747        fn exec(&mut self) -> PeriodicTaskResult
1748        {
1749            println!("taskstruct1 val: {}", self.a1);
1750
1751            let _ = self.s.send(self.a1);
1752
1753            return PeriodicTaskResult::Ok;
1754        }
1755    }
1756
1757    #[derive(Debug)]
1758    struct TaskStruct2
1759    {
1760        a1: u64,
1761        s: Sender<u64>,
1762    }
1763
1764    impl TaskStruct2
1765    {
1766        fn new(a1: u64, s: Sender<u64>) -> Self
1767        {
1768            return Self{ a1: a1, s };
1769        }
1770    }
1771
1772    impl PeriodicTask for TaskStruct2
1773    {
1774        fn exec(&mut self) -> PeriodicTaskResult
1775        {
1776            println!("taskstruct2 val: {}", self.a1);
1777
1778            self.s.send(self.a1).unwrap();
1779
1780            return PeriodicTaskResult::TaskReSchedule(PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(2, 0)));
1781        }
1782    }
1783
1784    impl<F> PeriodicTask for F
1785    where F: 'static + FnMut() + Send + fmt::Debug
1786    {
1787        fn exec(&mut self) -> PeriodicTaskResult 
1788        {
1789            (self)();
1790            return PeriodicTaskResult::Ok;
1791        }
1792    }
1793
1794    #[test]
1795    fn ttt()
1796    {
1797        let s = 
1798            SyncPeriodicTasks::new(1.try_into().unwrap(), true).unwrap();
1799
1800        let task1_ptt = 
1801            PeriodicTaskTime
1802                ::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1803
1804        let (send, recv) = mpsc::channel::<u64>();
1805 
1806        let task1_guard = 
1807            s.add_closure("task2", task1_ptt, 
1808                move || 
1809                { 
1810                    println!("test output"); 
1811                    send.send(2).unwrap();
1812
1813                    return PeriodicTaskResult::Ok;
1814                }
1815            ).unwrap();
1816
1817
1818        println!("added");
1819
1820        let val = recv.recv_timeout(Duration::from_millis(4000));
1821
1822        if val.is_err() == true
1823        {
1824            let e =  s.read_error();
1825            println!("ERROR, {:?}",e);
1826
1827            assert_eq!(true, false, "{:?}", e);
1828        }
1829
1830        println!("val: {:?}", val);
1831
1832        assert_eq!(Ok(2), val);
1833
1834        drop(task1_guard);
1835    }
1836
1837    #[test]
1838    fn test1_absolute_simple()
1839    {
1840        let s = SyncPeriodicTasks::new(1.try_into().unwrap(), false).unwrap();
1841
1842        let (send, recv) = mpsc::channel::<u64>();
1843
1844        let task1 = TaskStruct1::new(2, send);
1845        let task1_ptt = PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1846        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1847
1848        println!("added");
1849        let val = recv.recv();
1850
1851        println!("{:?}", val);
1852
1853        drop(task1_guard);
1854    }
1855
1856    #[test]
1857    fn test1_relative_simple()
1858    {
1859        let s = SyncPeriodicTasks::new(1.try_into().unwrap(), false).unwrap();
1860
1861        let (send, recv) = mpsc::channel::<u64>();
1862
1863        let task1 = TaskStruct1::new(2, send);
1864        let task1_ptt = PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1865        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1866        
1867        let mut s = Instant::now();
1868
1869        for i in 0..3
1870        {
1871            let val = recv.recv().unwrap();
1872
1873            let e = s.elapsed();
1874            s = Instant::now();
1875
1876            println!("{}: {:?} {:?} {}", i, val, e, e.as_micros());
1877            
1878            assert!(999000 < e.as_micros() && e.as_micros() < 10001200);
1879            assert_eq!(val, 2);
1880        }
1881
1882        drop(task1_guard);
1883
1884        std::thread::sleep(Duration::from_millis(100));
1885
1886        return;
1887    }
1888    
1889    #[test]
1890    fn test1_relative_resched_to_abs()
1891    {
1892        let s = SyncPeriodicTasks::new(1.try_into().unwrap(), false).unwrap();
1893
1894        let (send, recv) = mpsc::channel::<u64>();
1895
1896        let task1 = TaskStruct2::new(2, send);
1897        let task1_ptt = PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1898        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1899
1900        let s = Instant::now();
1901        match recv.recv_timeout(Duration::from_millis(1150))
1902        {
1903            Ok(rcv_a) => 
1904            {
1905                let e = s.elapsed();
1906                println!("{:?} {}", e, e.as_micros());
1907                assert_eq!(rcv_a, 2);
1908                assert!(990051 < e.as_micros() && e.as_micros() < 1020551);
1909            },
1910            Err(RecvTimeoutError::Timeout) => 
1911                panic!("tineout"),
1912            Err(e) =>
1913                panic!("{}", e),
1914        }
1915
1916        let s = Instant::now();
1917        match recv.recv_timeout(Duration::from_millis(2100))
1918        {
1919            Ok(rcv_a) => 
1920            {
1921                let e = s.elapsed();
1922                println!("{:?} {}", e, e.as_micros());
1923                assert_eq!(rcv_a, 2);
1924                assert!(1999642 < e.as_micros() && e.as_micros() < 2008342);
1925            },
1926            Err(RecvTimeoutError::Timeout) => 
1927                panic!("tineout"),
1928            Err(e) =>
1929                panic!("{}", e),
1930        }
1931
1932        let s = Instant::now();
1933        match recv.recv_timeout(Duration::from_millis(2100))
1934        {
1935            Ok(rcv_a) => 
1936            {
1937                let e = s.elapsed();
1938                println!("{:?} {}", e, e.as_micros());
1939                assert_eq!(rcv_a, 2);
1940                assert!(1999642 < e.as_micros() && e.as_micros() < 2003342);
1941            },
1942            Err(RecvTimeoutError::Timeout) => 
1943                panic!("tineout"),
1944            Err(e) =>
1945                panic!("{}", e),
1946        }
1947
1948        drop(task1_guard);
1949
1950        std::thread::sleep(Duration::from_millis(100));
1951
1952        return;
1953    }
1954
1955    #[test]
1956    fn test1_relative_simple_resched()
1957    {
1958        let s = SyncPeriodicTasks::new(1.try_into().unwrap(), false).unwrap();
1959
1960        let (send, recv) = mpsc::channel::<u64>();
1961
1962        let task1 = TaskStruct1::new(2, send);
1963        let task1_ptt = 
1964            PeriodicTaskTime::interval(
1965                RelativeTime::new_time(1, 0)
1966            );
1967        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1968        
1969        let mut s = Instant::now();
1970
1971        for i in 0..3
1972        {
1973            let val = recv.recv().unwrap();
1974
1975            let e = s.elapsed();
1976            s = Instant::now();
1977
1978            println!("{}: {:?} {:?} {}", i, val, e, e.as_micros());
1979            
1980            assert!(990000 < e.as_micros() && e.as_micros() < 10001200);
1981            assert_eq!(val, 2);
1982        }
1983
1984        task1_guard
1985            .reschedule_task(
1986                PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(2, 0))
1987            )
1988            .unwrap();
1989
1990        s = Instant::now();
1991        let val = recv.recv().unwrap();
1992        let e = s.elapsed();
1993
1994        println!("resched: {:?} {:?} {}", val, e, e.as_micros());
1995
1996        assert!(1990000 < e.as_micros() && e.as_micros() < 2003560);
1997
1998        let val = recv.recv_timeout(Duration::from_secs(3));
1999
2000        assert_eq!(val.is_err(), true);
2001        assert_eq!(val.err().unwrap(), RecvTimeoutError::Timeout);
2002
2003        drop(task1_guard);
2004
2005        std::thread::sleep(Duration::from_millis(100));
2006
2007        return;
2008    }
2009
2010    #[test]
2011    fn test1_relative_simple_cancel()
2012    {
2013        let s = SyncPeriodicTasks::new(1.try_into().unwrap(), false).unwrap();
2014
2015        let (send, recv) = mpsc::channel::<u64>();
2016
2017        let task1 = TaskStruct1::new(0, send.clone());
2018        let task1_ptt = 
2019            PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
2020
2021        let task2 = TaskStruct1::new(1, send.clone());
2022        let task2_ptt = 
2023            PeriodicTaskTime::interval(RelativeTime::new_time(2, 0));
2024
2025        let task3 = TaskStruct1::new(2, send);
2026        let task3_ptt = 
2027            PeriodicTaskTime::interval(RelativeTime::new_time(0, 500_000_000));
2028
2029        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
2030        let task2_guard = s.add("task2", task2, task2_ptt).unwrap();
2031        let task3_guard = s.add("task3", task3, task3_ptt).unwrap();
2032
2033        let mut a_cnt: [u8; 3] = [0_u8; 3];
2034
2035        let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
2036
2037        while AbsoluteTime::now() < end
2038        {
2039            match recv.recv_timeout(Duration::from_millis(1))
2040            {
2041                Ok(rcv_a) => 
2042                    a_cnt[rcv_a as usize] += 1,
2043                Err(RecvTimeoutError::Timeout) => 
2044                    continue,
2045                Err(e) =>
2046                    panic!("{}", e),
2047            }
2048
2049            
2050        }
2051
2052        assert_eq!(a_cnt[0], 5);
2053        assert_eq!(a_cnt[1], 2);
2054        assert_eq!(a_cnt[2], 10);
2055
2056        // drop task #3
2057        task3_guard.suspend_task().unwrap();
2058
2059
2060        let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
2061
2062        while AbsoluteTime::now() < end
2063        {
2064            match recv.recv_timeout(Duration::from_millis(1))
2065            {
2066                Ok(rcv_a) => 
2067                    a_cnt[rcv_a as usize] += 1,
2068                Err(RecvTimeoutError::Timeout) => 
2069                    continue,
2070                Err(e) =>
2071                    panic!("{}", e),
2072            }
2073
2074            
2075        }
2076
2077        assert_eq!(a_cnt[0] > 5, true);
2078        assert_eq!(a_cnt[1] > 2, true);
2079        assert!((a_cnt[2] == 10 || a_cnt[2] == 11));
2080
2081        drop(task1_guard);
2082        drop(task2_guard);
2083        drop(task3_guard);
2084
2085        let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
2086
2087        while AbsoluteTime::now() < end
2088        {
2089            match recv.recv_timeout(Duration::from_millis(1))
2090            {
2091                Ok(rcv_a) => 
2092                    a_cnt[rcv_a as usize] += 1,
2093                Err(RecvTimeoutError::Timeout) => 
2094                    continue,
2095                Err(_) =>
2096                    break,
2097            }
2098
2099            
2100        }
2101
2102        assert_eq!(AbsoluteTime::now() < end, true);
2103
2104        
2105
2106        return;
2107    }
2108
2109    // freebsd text failed with  src/periodic_task/sync_tasks.rs:1784:9 left: 6
2110    #[test]
2111    fn test2_multithread_1()
2112    {
2113        let s = SyncPeriodicTasks::new(2.try_into().unwrap(), false).unwrap();
2114
2115        let (send, recv) = mpsc::channel::<u64>();
2116
2117        let task1 = TaskStruct1::new(0, send.clone());
2118        let task1_ptt = 
2119            PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
2120
2121        let task2 = TaskStruct1::new(1, send.clone());
2122        let task2_ptt = 
2123            PeriodicTaskTime::interval(RelativeTime::new_time(2, 0));
2124
2125        let task3 = TaskStruct1::new(2, send.clone());
2126        let task3_ptt = 
2127            PeriodicTaskTime::interval(RelativeTime::new_time(0, 500_000_000));
2128
2129        let task4 = TaskStruct1::new(3, send.clone());
2130        let task4_ptt = 
2131            PeriodicTaskTime::interval(RelativeTime::new_time(0, 200_000_000));
2132
2133        let task5 = TaskStruct1::new(4, send.clone());
2134        let task5_ptt = 
2135            PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(5, 0));
2136
2137        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
2138        let task2_guard = s.add("task2", task2, task2_ptt).unwrap();
2139        let task3_guard = s.add("task3", task3, task3_ptt).unwrap();
2140        let task4_guard = s.add("task4", task4, task4_ptt).unwrap();
2141        let task5_guard = s.add("task5", task5, task5_ptt).unwrap();
2142
2143
2144        let mut a_cnt: [u8; 5] = [0_u8; 5];
2145
2146        let end = AbsoluteTime::now() + RelativeTime::new_time(5, 500_000_000);
2147
2148        while AbsoluteTime::now() < end
2149        {
2150            match recv.recv_timeout(Duration::from_millis(1))
2151            {
2152                Ok(rcv_a) => 
2153                    a_cnt[rcv_a as usize] += 1,
2154                Err(RecvTimeoutError::Timeout) => 
2155                    continue,
2156                Err(e) =>
2157                    panic!("{}", e),
2158            }
2159
2160            
2161        }
2162
2163        println!("{:?}", a_cnt);
2164
2165        assert!(a_cnt[0] == 5);
2166        assert!(a_cnt[1] == 2);
2167        assert!((a_cnt[2] == 10 || a_cnt[2] == 11));
2168        assert!(a_cnt[3] == 27);
2169        assert!(a_cnt[4] == 1);
2170
2171        task5_guard.reschedule_task(PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(0, 500_000_000))).unwrap();
2172
2173        let end = AbsoluteTime::now() + RelativeTime::new_time(0, 600_000_000);
2174
2175        while AbsoluteTime::now() < end
2176        {
2177            match recv.recv_timeout(Duration::from_millis(1))
2178            {
2179                Ok(rcv_a) => 
2180                    a_cnt[rcv_a as usize] += 1,
2181                Err(RecvTimeoutError::Timeout) => 
2182                    continue,
2183                Err(e) =>
2184                    panic!("{}", e),
2185            }
2186        }
2187
2188        println!("{:?}", a_cnt);
2189        assert!(a_cnt[4] == 2);
2190
2191        drop(task5_guard);
2192        drop(task4_guard);
2193        drop(task3_guard);
2194        drop(task2_guard);
2195
2196        let end = AbsoluteTime::now() + RelativeTime::new_time(2, 1000);
2197
2198        while AbsoluteTime::now() < end
2199        {
2200            match recv.recv_timeout(Duration::from_millis(1))
2201            {
2202                Ok(rcv_a) => 
2203                    a_cnt[rcv_a as usize] += 1,
2204                Err(RecvTimeoutError::Timeout) => 
2205                    continue,
2206                Err(e) =>
2207                    panic!("{}", e),
2208            }
2209        }
2210
2211
2212        println!("{:?}", a_cnt);
2213        assert_eq!(a_cnt[4], 2);
2214        assert_eq!(a_cnt[0], 8);
2215
2216        drop(task1_guard);
2217
2218        std::thread::sleep(Duration::from_millis(10));
2219        return;
2220    }
2221}