timer_deque_rs/periodic_task/
sync_tasks.rs

1/*-
2 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
6 *  4neko.org alex@4neko.org
7 * 
8 * The timer-rs crate can be redistributed and/or modified
9 * under the terms of either of the following licenses:
10 *
11 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
12 *                     
13 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
14 */
15
16use std::
17{
18    cell::{OnceCell, RefCell}, collections::HashMap, fmt, mem, num::NonZeroUsize, os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd}, sync::
19    {
20        atomic::{AtomicBool, Ordering}, 
21        mpsc::{self}, 
22        Arc, 
23        Mutex, 
24        TryLockError, 
25        Weak
26    }, thread::JoinHandle, time::Duration
27};
28
29use crossbeam_deque::{Injector, Steal};
30use rand::random_range;
31
32use crate::
33{
34    error::{TimerError, TimerErrorType, TimerResult}, 
35    map_timer_err, 
36    timer_portable::{poll::PollInterrupt, timer::TimerFd, PollEventType, TimerExpMode, TimerFlags, TimerType}, 
37    AbsoluteTime, 
38    FdTimerCom, 
39    RelativeTime, 
40    TimerPoll, 
41    TimerReadRes
42};
43
44/// A result of the task execution which is returned by the user task.
45/// See description for the each variant.
46/// 
47/// The task
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum PeriodicTaskResult
50{
51    /// The task exec was ok. No errors.
52    Ok,
53
54    /// Cancels (but not removing) the task due to error or for other reason.
55    /// By returning this result the timer is unset to stop generating events. 
56    /// But, if since the `AbortTask` was received and a timer has timed out again 
57    /// the event will be called again. It is not necessary to return the `AbortTask` again, 
58    /// just return [PeriodicTaskResult::Ok] exiting imidiatly.
59    CancelTask,
60
61    /// A request to reschedule the task to a new time or interval. The `0` argument 
62    /// should contain new time. The timer mode can be changed.
63    TaskReSchedule(PeriodicTaskTime)
64}
65
66/// A trait which should be implemented by the instance which will be added as a `task`.
67/// 
68/// * the `instance` must implement [Send] because it may be moved to other thread.
69/// 
70/// * it is not necessary that the `instance` to impl [Sync] because the `instance` will never
71/// be executed from two different threads and it is already protected by mutex.
72/// 
73/// A `task` should never block the thread i.e call some functions which may block for a large
74/// period of time! 
75pub trait PeriodicTask: Send + fmt::Debug + 'static
76{
77    /// A task entry point. The task should return a [PeriodicTaskResult].
78    fn exec(&mut self) -> PeriodicTaskResult;
79}
80
81/// An alias for the boxed value.
82pub type PeriodicTaskHndl = Box<dyn PeriodicTask>;
83
84
85/// A global FIFO commands.
86/// 
87/// For all variants, the option [Option] [mpsc::Sender] is used for the error feedback and if
88/// it is set to [Option::None] the error will not be reported.
89#[derive(Debug)]
90pub(crate) enum GlobalTasks
91{
92    /// Adds task to the task system. If error happens it will be reported over the `1` and item `0`
93    /// will not be added to the instance.
94    AddTask( PeriodicTaskTicket, Option<mpsc::Sender<TimerResult<()>>> ),
95
96    /// Removes the task from the system.
97    RemoveTask( Arc<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
98
99    /// Changes the timer value.
100    ReschedTask( Weak<PeriodicTaskGuardInner>, PeriodicTaskTime, Option<mpsc::Sender<TimerResult<()>>> ),
101
102    /// Suspends the task but does not remove the task. The task which is planned to suspend, if it is already
103    /// in the exec queue, will be executed.
104    SuspendTask( Weak<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
105
106    /// Resumes the task by enabling timer.
107    ResumeTask( Weak<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
108}
109
110/// A local task queue commands.
111#[derive(Debug)]
112pub enum ThreadTask
113{
114    /// Execute the task.
115    TaskExec( Arc<NewTaskTicket> )
116}
117
118/// A ticket which is assigned to specific task in order to send the same task to the same
119/// thread which is already assigned. If only one thread is allocated, then this is not
120/// really necessary (will be optimized in future).
121#[derive(Debug)]
122pub struct NewTaskTicket
123{
124    /// A thread handler of the specific thread.
125    task_thread: Arc<ThreadHandler>,
126    ptgi: Weak<PeriodicTaskGuardInner>,
127}
128
129impl NewTaskTicket
130{
131    fn new(task_thread: Arc<ThreadHandler>, ptgi: Weak<PeriodicTaskGuardInner>) -> Self
132    {
133        return 
134            Self
135            {
136                task_thread: 
137                    task_thread,
138                ptgi:
139                    ptgi
140            };
141    }
142
143    /// Sends the task on exec to the local queue of the thread.
144    fn send_task(this: Arc<NewTaskTicket>, task_rep_count: u64, thread_hndl_cnt: usize)
145    {
146        let strong_cnt = Arc::strong_count(&this);
147        let thread = this.task_thread.clone();
148
149        for _ in 0..task_rep_count
150        {
151            thread.send_task(this.clone(), strong_cnt < 2 && thread_hndl_cnt > 1);
152        }
153    }
154}
155
156/// The internal structure which is stored in pair with the timer's FD. Contains necessary references
157/// and values.
158#[derive(Debug)]
159pub(crate) struct PeriodicTaskTicket
160{
161    /// A task name.
162    task_name: String,
163
164    /// A time which was set to timer. Needed in case if timer was cancelled ny OS or suspended.
165    ptt: PeriodicTaskTime,
166
167    /// A [Weak] reference to the current [NewTaskTicket] which identifies to which thread the
168    /// task exec was assigned. The [Arc] of [NewTaskTicket] is clonned for each task exec round.
169    /// If is not `upgradable` the task is no logner assigned to thread.
170    weak_ticket: Weak<NewTaskTicket>,
171
172    /// A [Weak] reference to the [PeriodicTaskGuardInner]. If this reference is not `upgradable` 
173    /// then the task is no logner valid.
174    ptg: Weak<PeriodicTaskGuardInner>,
175}
176
177impl PeriodicTaskTicket
178{
179    fn new(task_name: String, ptt: PeriodicTaskTime, ptg: Weak<PeriodicTaskGuardInner>) -> Self
180    {
181        return 
182            Self
183            {
184                task_name: task_name,
185                ptt: ptt,
186                weak_ticket: Weak::new(),
187                ptg: ptg,
188            };
189    }
190
191
192    fn get_task_guard(&self) -> TimerResult<Arc<PeriodicTaskGuardInner>>
193    {
194        return 
195            self
196                .ptg
197                .upgrade()
198                .ok_or_else(||
199                    map_timer_err!(TimerErrorType::ReferenceGone, "task: '{}' reference to timer has gone", 
200                        self.task_name)
201                );
202    }
203
204    fn get_timer_time(&self) -> &PeriodicTaskTime
205    {
206        return &self.ptt;
207    }
208}
209
210/// An instance which is returned by the [SyncPeriodicTasks::add] whuch guards the 
211/// task and allows to control its state. If task is no longer needed the instance
212/// can be dropped and it will be removed from the system.
213#[derive(Debug)]
214pub struct PeriodicTaskGuard
215{
216    /// A task name
217    task_name: String,
218
219    /// A [Arc] to [PeriodicTaskGuardInner] which contains timer and other things. 
220    /// Normally this is the base instance i.e reference and if it is dropped
221    /// it indicates to the task `executor` that the instance is no logner valid.
222    /// The autoremoval is perfomed via sending the `command` over the global
223    /// qeueu.
224    /// 
225    /// The [Option] is needed to `take` the instance.
226    guard: Option<Arc<PeriodicTaskGuardInner>>,
227
228    /// A clonned instance to the executor spawner which contains the reference 
229    /// to channel.
230    spt: Arc<SyncPeriodicTasksInner>
231}
232
233impl Drop for PeriodicTaskGuard
234{
235    fn drop(&mut self) 
236    {
237        let guard = self.guard.take().unwrap();
238
239        let _ = self.spt.send_global_cmd(GlobalTasks::RemoveTask(guard, None));
240
241        return;
242    }
243}
244
245impl PeriodicTaskGuard
246{
247    /// Requests the task rescheduling - changine the timer time or mode. 
248    /// The `task` is represented by the calling instance. 
249    /// 
250    /// This function blocks
251    /// the current thread for the maximum (in worst case) 5 seconds which is a 
252    /// timeout for feedback reception.
253    /// 
254    /// # Arguments
255    /// 
256    /// * `ptt` - a new time to be set to timer.
257    /// 
258    /// # Returns 
259    /// 
260    /// A [Result] as alias [TimerResult] is returned.
261    
262    /// In case if error is retuned, the operation should be considered as not completed
263    /// correctly and the executor instance is poisoned i.e a bug happened.
264    /// 
265    /// The common errors may be retuned:
266    /// 
267    /// * [TimerErrorType::MpscTimeout] - a feedback (with the result) reception timeout.
268    ///     Probably this is because the task executor is too busy.
269    /// 
270    /// * [TimerErrorType::TimerError] - with the timer error.
271    /// 
272    /// * [TimerErrorType::NotFound] - if instance was not found in the internal records. 
273    ///     Should not happen. If appears, then probably this is a bug.
274    pub 
275    fn reschedule_task(&self, ptt: PeriodicTaskTime) -> TimerResult<()>
276    {
277        let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
278
279        let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
280
281        self.spt.send_global_cmd(GlobalTasks::ReschedTask(weak_ptgi, ptt, Some(snd)))?;
282
283        return 
284            rcv
285                .recv_timeout(Duration::from_secs(10))
286                .map_err(|e|
287                    map_timer_err!(TimerErrorType::MpscTimeout, "reschedule_task(), task name: '{}', timer '{}' MPSC rcv timeout error: '{}'", 
288                        self.task_name, self.guard.as_ref().unwrap().timer_fd, e)
289                )?;
290    }
291
292    /// Requests to suspent the current `task`.
293    /// 
294    /// This function blocks
295    /// the current thread for the maximum (in worst case) 5 seconds which is a 
296    /// timeout for feedback reception.
297    /// 
298    /// # Returns 
299    /// 
300    /// A [Result] as alias [TimerResult] is returned.
301    /// 
302    /// In case if error is retuned, the operation should be considered as not completed
303    /// correctly and the executor instance is poisoned i.e a bug happened.
304    /// 
305    /// The common errors may be retuned:
306    /// 
307    /// * [TimerErrorType::MpscTimeout] - a feedback (with the result) reception timeout.
308    ///     Probably this is because the task executor is too busy.
309    /// 
310    /// * [TimerErrorType::TimerError] - with the timer error.
311    pub 
312    fn suspend_task(&self) -> TimerResult<()>
313    {
314        let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
315
316        let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
317
318        self.spt.send_global_cmd(GlobalTasks::SuspendTask(weak_ptgi, Some(snd)))?;
319
320        return 
321            rcv
322                .recv_timeout(Duration::from_secs(10))
323                .map_err(|e|
324                    map_timer_err!(TimerErrorType::MpscTimeout, "suspend_task(), task name: '{}', timer '{}' MPSC rcv timeout error: '{}'", 
325                        self.task_name, self.guard.as_ref().unwrap().timer_fd, e)
326                )?;
327    }
328
329    /// Requests to resume the task from the suspend state.
330    /// 
331    /// This function blocks
332    /// the current thread for the maximum (in worst case) 5 seconds which is a 
333    /// timeout for feedback reception.
334    /// 
335    /// # Returns 
336    /// 
337    /// A [Result] as alias [TimerResult] is returned.
338    /// 
339    /// In case if error is retuned, the operation should be considered as not completed
340    /// correctly and the executor instance is poisoned i.e a bug happened.
341    /// 
342    /// The common errors may be retuned:
343    /// 
344    /// * [TimerErrorType::MpscTimeout] - a feedback (with the result) reception timeout.
345    ///     Probably this is because the task executor is too busy.
346    /// 
347    /// * [TimerErrorType::TimerError] - with the timer error.
348    pub 
349    fn resume_task(&self) -> TimerResult<()>
350    {
351        let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
352
353        let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
354
355        self.spt.send_global_cmd(GlobalTasks::ResumeTask(weak_ptgi, Some(snd)))?;
356
357        return 
358            rcv
359                .recv_timeout(Duration::from_secs(10))
360                .map_err(|e|
361                    map_timer_err!(TimerErrorType::MpscTimeout, "resume_task(), task name: '{}', timer '{}' MPSC rcv timeout error: '{}'", 
362                        self.task_name, self.guard.as_ref().unwrap().timer_fd, e)
363                )?;
364    }
365}
366
367/// Programs the taks's timer to specific time and mode. This instance is
368/// a wrapper around the [TimerExpMode] as this struct requers the generic to
369/// be specified which defines the type of the time.
370#[derive(Debug, Clone, Copy, PartialEq, Eq)]
371pub enum PeriodicTaskTime
372{
373    /// A provided time is absolute time in future.
374    Absolute(TimerExpMode<AbsoluteTime>),
375
376    /// A provided time is relative to current time.
377    Relative(TimerExpMode<RelativeTime>),
378}
379
380impl From<TimerExpMode<AbsoluteTime>> for PeriodicTaskTime
381{
382    fn from(value: TimerExpMode<AbsoluteTime>) -> Self 
383    {
384        return Self::Absolute(value);
385    }
386}
387
388impl From<TimerExpMode<RelativeTime>> for PeriodicTaskTime
389{
390    fn from(value: TimerExpMode<RelativeTime>) -> Self 
391    {
392        return Self::Relative(value);
393    }
394}
395
396impl fmt::Display for PeriodicTaskTime
397{
398    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
399    {
400        match self
401        {
402            Self::Absolute(t) => 
403                write!(f, "{}", t),
404            Self::Relative(t) => 
405                write!(f, "{}", t),
406        }
407    }
408}
409
410impl PeriodicTaskTime
411{
412    /// The timer is set to time up once and for the specific `absolute` time in future 
413    /// (to current realclock time).
414    #[inline]
415    pub 
416    fn exact_time(abs_time: AbsoluteTime) -> Self
417    {
418        return Self::Absolute(TimerExpMode::<AbsoluteTime>::new_oneshot(abs_time));
419    }
420
421    /// The timer is set to time up in the interval for the `relative` time.
422    #[inline]
423    pub 
424    fn interval(rel_time: RelativeTime) -> Self
425    {
426        return Self::Relative(TimerExpMode::<RelativeTime>::new_interval(rel_time));
427    }
428
429    /// The timer is set to time up in the interval for the `relative` time with the initial
430    /// delay.
431    /// 
432    /// # Arguments
433    /// 
434    /// * `start_del_time` - a [RelativeTime] of the initial delay.
435    /// 
436    /// * `rel_int_time` - a [RelativeTime] of the interval.
437    #[inline]
438    pub 
439    fn interval_with_start_delay(start_del_time: RelativeTime, rel_int_time: RelativeTime) -> Self
440    {
441        return Self::Relative(TimerExpMode::<RelativeTime>::new_interval_with_init_delay(start_del_time, rel_int_time));
442    }
443
444    /// Sets the `timer_fd` instance to the specific value stored in the current instance.
445    fn set_timer(&self, timer_fd: &TimerFd) -> TimerResult<()>
446    {
447        match *self
448        {
449            Self::Absolute(timer_exp_mode) => 
450                return 
451                    timer_fd
452                        .set_time(timer_exp_mode)
453                        .map_err(|e|
454                            map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot set time '{}' for timer: '{}'", timer_exp_mode, timer_fd )
455                        ),
456            Self::Relative(timer_exp_mode) => 
457                return 
458                    timer_fd
459                        .set_time(timer_exp_mode)
460                        .map_err(|e|
461                            map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot set time '{}' for timer: '{}'", timer_exp_mode, timer_fd )
462                        ),
463        }
464    }
465}
466
467
468/// A instane which contains the task's timer and a task.
469#[derive(Debug)]
470pub(crate) struct PeriodicTaskGuardInner
471{
472    /// A task's timer.
473    timer_fd: TimerFd,
474
475    /// A task itself. At the moment it is mutexed in order to provide
476    /// the mutability, because the current instance is wrapped into [Arc].
477    /// But, the task is never executed from different threads and mutex
478    /// should be replaced with something that is [Send] and can provide
479    /// mutable reference.
480    task: Mutex<PeriodicTaskHndl>,
481}
482
483impl fmt::Display for PeriodicTaskGuardInner
484{
485    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
486    {
487        write!(f, "timer: '{}'", self.timer_fd)
488    }
489}
490
491impl AsFd for PeriodicTaskGuardInner
492{
493    fn as_fd(&self) -> BorrowedFd<'_> 
494    {
495        return self.timer_fd.as_fd();
496    }
497}
498
499impl PeriodicTaskGuardInner
500{
501    fn new(timer_name: String, task_inst: PeriodicTaskHndl) -> TimerResult<Self>
502    {
503        let timer = 
504            TimerFd::new(timer_name.clone().into(), TimerType::CLOCK_REALTIME, 
505                TimerFlags::TFD_CLOEXEC | TimerFlags::TFD_NONBLOCK)
506                .map_err(|e|
507                    map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot setup timer for task: '{}'", timer_name)
508                )?;
509
510        return Ok(Self { timer_fd: timer, task: Mutex::new(task_inst) });
511    }
512
513    #[inline]
514    fn setup_timer(&self, task_time_set: &PeriodicTaskTime) -> TimerResult<()>
515    {
516        return task_time_set.set_timer(&self.timer_fd);
517    }
518
519    #[inline]
520    fn unset_timer(&self) -> TimerResult<()>
521    {
522        return 
523            self
524                .timer_fd
525                .unset_time()
526                .map_err(|e|
527                    map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "unsetting timer '{}' returned error: {}", self.timer_fd, e)
528                );
529    }
530}
531
532/// Internal structure for every worker thread.
533struct ThreadWorker
534{
535    /// A thread name, not task name.
536    thread_name: String,
537
538    /// An [Injector] queue for the global tasks received from other threads.
539    global_task_injector: Arc<Injector<GlobalTasks>>,
540
541    /// An [Injector] queue for local (specific to current thread) tasks.
542    local_thread_inj: Arc<Injector<ThreadTask>>,
543
544    /// A flag which is used to control the thread's loop. Is set to `false`
545    /// when thread should exit.
546    thread_run_flag: Arc<AtomicBool>,
547
548    /// A shared instance of the [SyncPeriodicTasksInner] which holds all information
549    /// about tasks and FD polling. The thread which aquired the mutex first will
550    /// process the `global_task_injector` queue and poll the timers for the events which
551    /// will be shared over with other threads.
552    spti: Arc<Mutex<SharedPeriodicTasks>>,
553
554    /// A last thread to which the task was assigned.
555    thread_last_id: usize,
556
557    /// A poll interrupt signal sender.
558    poll_int: PollInterrupt
559}
560
561impl ThreadWorker
562{
563    fn new(thread_name: String, global_task_injector: Arc<Injector<GlobalTasks>>, 
564        spti: Arc<Mutex<SharedPeriodicTasks>>, poll_int: PollInterrupt) -> TimerResult<ThreadHandler>
565    {
566        let local_thread_inj = Arc::new(Injector::<ThreadTask>::new());
567        let thread_run_flag = Arc::new(AtomicBool::new(true));
568        let thread_run_flag_weak = Arc::downgrade(&thread_run_flag);
569
570        let worker = 
571            ThreadWorker
572            {
573                thread_name: 
574                    thread_name.clone(),
575                global_task_injector: 
576                    global_task_injector,
577                local_thread_inj:
578                    local_thread_inj.clone(),
579                thread_run_flag: 
580                    thread_run_flag,
581                spti:
582                    spti,
583                thread_last_id:
584                    0,
585                poll_int:
586                    poll_int
587            };
588
589        let thread_hndl =
590            std::thread::Builder::new()
591                .name(thread_name)
592                .spawn(|| worker.worker())
593                .map_err(|e|
594                    map_timer_err!(TimerErrorType::SpawnError(e.kind()), "{}", e)
595                )?;
596            
597
598        return Ok( ThreadHandler::new(thread_hndl, local_thread_inj, thread_run_flag_weak) );
599    }
600
601
602    fn worker(mut self) -> TimerResult<()>
603    {
604        // force to park to allow the main thread to initialize everything
605        std::thread::park();
606
607        while self.thread_run_flag.load(Ordering::Acquire) == true
608        {
609            // check local queue for the task or token
610            while let Steal::Success(task) = self.local_thread_inj.steal()
611            {
612                match task
613                {
614                    ThreadTask::TaskExec(task_exec) =>
615                    {
616                        let Some(ptgi) = task_exec.ptgi.upgrade()
617                            else
618                            {   
619                                // task was removed while was in queue.
620                                continue;
621                            };
622
623                        // call task
624                        match ptgi.task.lock().unwrap().exec()
625                        {
626                            PeriodicTaskResult::Ok => 
627                                {},
628                            PeriodicTaskResult::CancelTask => 
629                            {
630                                self
631                                    .global_task_injector
632                                    .push(GlobalTasks::SuspendTask(task_exec.ptgi.clone(), None));
633
634                                let _ = self.poll_int.aquire().map(|v| v.interrupt_drop());
635                            },
636                            PeriodicTaskResult::TaskReSchedule(ptt) => 
637                            {
638                                self
639                                    .global_task_injector
640                                    .push(GlobalTasks::ReschedTask(task_exec.ptgi.clone(), ptt, None));
641
642                                let _ = self.poll_int.aquire().map(|v| v.interrupt_drop());
643                            }
644                        }
645
646                        drop(task_exec);
647                    }
648                }
649            }
650
651            let spti_lock_res =  self.spti.try_lock();
652
653            if let Ok(mut task_token) = spti_lock_res
654            {
655                let thread_hndl_cnt = task_token.thread_pool.get().unwrap().len();
656
657                // check global task queue
658                while let Steal::Success(task) = self.global_task_injector.steal()
659                {
660                    match task
661                    {
662                        GlobalTasks::AddTask(ptt, opt_err_ret) => 
663                        {
664
665                            let timer = 
666                                match ptt.get_task_guard()
667                                {
668                                    Ok(r) =>
669                                    {
670                                        r
671                                    },
672                                    Err(e) => 
673                                    {
674                                        if let Some(err_ret) = opt_err_ret
675                                        {
676                                            let _ = err_ret.send(Err(e));
677                                        }
678
679                                        continue;
680                                    }
681                                };
682
683                            // setup timer
684                            if let Err(e) = timer.setup_timer(ptt.get_timer_time())
685                            {
686                                if let Some(err_ret) = opt_err_ret
687                                {
688                                    let _ = err_ret.send(Err(e));
689                                }
690
691                                continue;
692                            }
693
694                            // add to poll
695                            if let Err(e) = task_token.timers_poll.add(&timer.timer_fd)
696                            {
697                                if let Some(err_ret) = opt_err_ret
698                                {
699                                    let _ = err_ret.send(Err(e));
700                                }
701
702                                continue;
703                            }
704
705                                    
706                            let timer_fd = timer.as_fd().as_raw_fd();
707                            
708                            task_token.tasks.insert(timer_fd, RefCell::new(ptt));
709                            
710                            if let Some(err_ret) = opt_err_ret
711                            {
712                                let _ = err_ret.send(Ok(()));
713                            }
714                        },
715                        GlobalTasks::RemoveTask(ptg_arc, opt_err_ret) => 
716                        {
717                            let _ = ptg_arc.timer_fd.unset_time();
718
719                            if let Some(_v) = task_token.tasks.remove(&ptg_arc.as_fd().as_raw_fd())
720                            {
721                                let res = task_token.timers_poll.delete(&ptg_arc.timer_fd);
722
723                                if let Some(err_ret) = opt_err_ret.as_ref()
724                                {
725                                    let _ = err_ret.send(res);
726                                }
727                            }
728                            else
729                            {
730                                if let Some(err_ret) = opt_err_ret
731                                {
732                                    let _ = err_ret.send(Ok(()));
733                                }
734                            }
735
736                            drop(ptg_arc);
737                        },
738                        GlobalTasks::ReschedTask( ptg_weak, ptt, opt_err_ret ) =>
739                        {
740                            let Some(ptg_arc) = ptg_weak.upgrade()
741                                else
742                                {   
743                                    // task was removed while was in queue.
744                                    continue;
745                                };
746
747                            if let Err(e) = ptg_arc.setup_timer(&ptt)
748                            {
749                                if let Some(err_ret) = opt_err_ret.as_ref()
750                                {
751                                    let _ = err_ret.send(Err(e));
752                                }
753
754                                continue;
755                            }
756
757                            // replace the time
758                            let res_task = 
759                                task_token
760                                    .tasks
761                                    .get(&ptg_arc.timer_fd.as_raw_fd())
762                                    .ok_or_else(||
763                                        map_timer_err!(TimerErrorType::NotFound, 
764                                            "thread: '{}', timer with FD: '{}' was not found", self.thread_name, ptg_arc.timer_fd)
765                                    );
766
767                            let res = 
768                                match res_task
769                                {
770                                    Ok(task) => 
771                                    {
772                                        let _ = mem::replace(&mut task.borrow_mut().ptt, ptt); 
773
774                                        Ok(())
775                                    },
776                                    Err(err) => 
777                                    {
778                                        Err(err)
779                                    }
780                                };
781
782                            if let Some(err_ret) = opt_err_ret
783                            {
784                                let _ = err_ret.send(res);
785                            }
786                        },
787                        GlobalTasks::SuspendTask(ptg_weak, opt_err_ret) =>
788                        {
789                            let Some(ptg_arc) = ptg_weak.upgrade()
790                                else
791                                {   
792                                    // task was removed while was in queue.
793                                    continue;
794                                };
795
796                            let res = ptg_arc.unset_timer();
797
798                            if let Some(err_ret) = opt_err_ret
799                            {
800                                let _ = err_ret.send(res);
801                            }
802                        },
803                        GlobalTasks::ResumeTask(ptg_weak, opt_err_ret) =>
804                        {
805                            let Some(ptg_arc) = ptg_weak.upgrade()
806                                else
807                                {   
808                                    // task was removed while was in queue.
809                                    continue;
810                                };
811
812                            let res_task = 
813                                task_token
814                                    .tasks
815                                    .get(&ptg_arc.timer_fd.as_raw_fd())
816                                    .ok_or_else(||
817                                        map_timer_err!(TimerErrorType::NotFound, 
818                                            "thread: '{}', timer with FD: '{}' was not found", self.thread_name, ptg_arc.timer_fd)
819                                    );
820
821                            if let Err(e) = res_task
822                            {
823                                if let Some(err_ret) = opt_err_ret.as_ref()
824                                {
825                                    let _ = err_ret.send(Err(e));
826                                }
827
828                                continue;
829                            }
830
831                            let res = 
832                                ptg_arc.setup_timer(res_task.unwrap().borrow().get_timer_time());
833
834                            if let Some(err_ret) = opt_err_ret.as_ref()
835                            {
836                                let _ = err_ret.send(res);
837                            }
838                        }
839                    }
840                }
841
842                // poll
843                let res = task_token.timers_poll.poll(Some(5000))?;
844
845
846                for event in res.into_inner()
847                {
848                    match event
849                    {
850                        PollEventType::Some(timer_fd) => 
851                        {
852                            // resolve fd into task
853                            let task = 
854                                task_token
855                                    .tasks
856                                    .get(&timer_fd)
857                                    .ok_or_else(||
858                                        map_timer_err!(TimerErrorType::NotFound, 
859                                            "thread: '{}', timer with FD: '{}' was not found", self.thread_name, timer_fd)
860                                    )?;
861
862                            // check that PeriodicTaskGuard is ok
863                            let Some(ptg_arc) = task.borrow().ptg.upgrade()
864                                else
865                                {
866                                    // if Arc cannot be upgraded, then remove the task as removed
867                                    task_token.tasks.remove(&timer_fd);
868
869                                    // continue event handling
870                                    continue;
871                                };
872
873                            let timer_rd_res= 
874                                ptg_arc
875                                    .timer_fd
876                                    .read()
877                                    .map_err(|e|
878                                        map_timer_err!(TimerErrorType::TimerError(e.get_errno()), 
879                                            "thread: '{}', timer with FD: '{}' err: {}", self.thread_name, timer_fd, e)
880                                    )?;
881
882                            // check timer result
883                            let overflow_cnt: u64 = 
884                                match timer_rd_res
885                                {
886                                    TimerReadRes::Ok(overfl) => 
887                                    {
888                                        overfl
889                                    },
890                                    TimerReadRes::Cancelled => 
891                                    {
892                                        // the system time was modified, resched instance
893
894                                        self
895                                            .global_task_injector
896                                            .push(
897                                                GlobalTasks::ReschedTask(task.borrow().ptg.clone(), task.borrow().ptt.clone(), None)
898                                            );
899
900                                        // timer was reset continue
901                                        continue;
902                                    },
903                                    TimerReadRes::WouldBlock => 
904                                    {
905                                        // should not happen, silently ignore or panic?
906                                        panic!("assertion trap: timer retuned WouldBlock, {}", ptg_arc);
907                                    }
908                                };
909
910                            // check if ticket presents, if not create
911                            let ticket_arc_opt = task.borrow().weak_ticket.upgrade();
912
913
914                            let ticket = 
915                                match ticket_arc_opt
916                                {
917                                    Some(ticket) => 
918                                        ticket,
919                                    None => 
920                                    {    
921                                        let task_thread = 
922                                            {
923                                                /*let thread_local_hnd = 
924                                                    task_token
925                                                        .thread_pool
926                                                        .get()
927                                                        .unwrap()[self.thread_last_id]
928                                                        .get_thread_handler();*/
929
930                                               
931
932                                                self.thread_last_id = (self.thread_last_id + 1) % thread_hndl_cnt;
933
934                                                //thread_local_hnd
935                                                task_token.clone_thread_handler(self.thread_last_id)
936                                            }; 
937
938                                        let ticket =
939                                            {
940                                                //let b_task = task.borrow();
941                                                Arc::new(
942                                                    NewTaskTicket::new(task_thread, task.borrow().ptg.clone())
943                                                )
944                                            };
945
946                                        task.borrow_mut().weak_ticket = Arc::downgrade(&ticket);
947
948                                        ticket
949                                    }
950                                };
951                            
952
953                            
954                            NewTaskTicket::send_task(ticket, overflow_cnt, thread_hndl_cnt);
955                        },
956                        PollEventType::TimerRemoved(timer_fd) =>
957                        {
958                            // remove task
959                            task_token.tasks.remove(&timer_fd);
960                        },
961                        PollEventType::SubError(_timer_error) => 
962                        {
963                            // ignore
964                        },
965                    }
966                } // for
967            }
968            else if let Err(TryLockError::WouldBlock) = spti_lock_res
969            {
970                if self.thread_run_flag.load(Ordering::Acquire) == false
971                {
972                    return Ok(());
973                }
974
975                if self.local_thread_inj.is_empty() == true
976                {
977                    std::thread::park_timeout(Duration::from_secs(2));
978                }
979            } 
980            
981        } // while
982
983        return Ok(());
984    }
985}
986
987
988/// A common instance which contains the specific thread handler and
989/// a [Weak] reference to the loop controlling flag.
990#[derive(Debug)]
991struct ThreadHandler
992{
993    /// A thread join handler
994    hndl: JoinHandle<TimerResult<()>>,
995
996    /// A local task injector.
997    task_injector: Arc<Injector<ThreadTask>>,
998
999    /// A flag which controls the thread loop. Initialized as `true`.
1000    thread_flag: Weak<AtomicBool>,
1001}
1002
1003impl ThreadHandler
1004{
1005    fn new(hndl: JoinHandle<TimerResult<()>>, task_injector: Arc<Injector<ThreadTask>>, thread_flag: Weak<AtomicBool>) -> Self
1006    {            
1007        return 
1008            Self
1009            {
1010                hndl,
1011                task_injector,
1012                thread_flag: thread_flag
1013            };
1014    }
1015
1016    fn stop(&self)
1017    {
1018        if let Some(v) = self.thread_flag.upgrade()
1019        {
1020            v.store(false, Ordering::Release);
1021        }
1022    }
1023
1024    fn unpark(&self) 
1025    {
1026        self.hndl.thread().unpark();
1027    }
1028
1029    fn send_task(&self, task: Arc<NewTaskTicket>, unpark: bool)
1030    {
1031        self.task_injector.push(ThreadTask::TaskExec(task));
1032
1033        if unpark == true
1034        {
1035            self.hndl.thread().unpark();
1036        }
1037
1038        return;
1039    }
1040
1041    fn clean_local_queue(&self)
1042    {
1043        while let Steal::Success(_) = self.task_injector.steal() {}
1044
1045        return;
1046    }
1047}
1048
1049/// A instance which contains all information about the threads allocated for the
1050/// task execution, tasks and timer polling. An instance is shared with all threads.
1051#[derive(Debug)]
1052pub struct SharedPeriodicTasks
1053{
1054    /// A list of the threads. A [OnceCell] is used to initialize the field once.
1055    thread_pool: OnceCell<Arc<Vec<Arc<ThreadHandler>>>>,
1056
1057    /// A [HashMap] of the regestered tasks. Each task is mapped to its timer FD.
1058    tasks: HashMap<RawFd, RefCell<PeriodicTaskTicket>>,
1059
1060    /// A timer event poll. 
1061    timers_poll: TimerPoll,
1062}
1063
1064
1065impl SharedPeriodicTasks
1066{ 
1067    fn new() -> TimerResult<Self>
1068    {
1069
1070        return Ok( 
1071            Self 
1072            { 
1073                thread_pool: OnceCell::default(),
1074                tasks: HashMap::new(),
1075                timers_poll: TimerPoll::new()?
1076            }
1077        );
1078    }
1079
1080    fn clone_thread_handler(&self, thread_last_id: usize) -> Arc<ThreadHandler>
1081    {
1082        let thread_local_hnd = 
1083            self
1084                .thread_pool
1085                .get()
1086                .unwrap()[thread_last_id]
1087                .clone();
1088
1089        return thread_local_hnd;
1090    }
1091}
1092
1093#[derive(Debug)]
1094pub struct SyncPeriodicTasksInner
1095{
1096    /// A [Weak] reference to the [EventFd] of the timer `poll` which is used to interrupt the polling.
1097    poll_int: PollInterrupt,
1098
1099    /// A `global` FIFO which is used to send commands to the task executor.
1100    task_injector: Arc<Injector<GlobalTasks>>,
1101}
1102
1103impl SyncPeriodicTasksInner
1104{
1105    fn send_global_cmd(&self, glob: GlobalTasks) -> TimerResult<()>
1106    {
1107        let poll_int = 
1108            self.poll_int.aquire()?;
1109                
1110        self.task_injector.push(glob);
1111
1112        poll_int.interrupt_drop()?;
1113
1114        return Ok(());
1115    }
1116
1117    fn clear_global_queue(&self)
1118    {
1119        while let Steal::Success(_) = self.task_injector.steal() {}
1120
1121        return;
1122    }
1123}
1124
1125/// A `main` instance which spawns the task executor.
1126/// 
1127/// ```ignore
1128/// let spt = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1129/// let task1 = TaskStruct1::new(2, send);
1130/// let task1_ptt = PeriodicTaskTime::Relative(TimerExpMode::<RelativeTime>::new_interval(RelativeTime::new_time(1, 0)));
1131/// let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1132/// ```
1133/// 
1134/// This instance should be kept somewhere and dropped only if the task executor WITH
1135/// all spawned tasks are no longer needed.
1136#[derive(Debug, Clone)]
1137pub struct SyncPeriodicTasks
1138{
1139    threads: Option<Arc<Vec<Arc<ThreadHandler>>>>,
1140
1141    inner: Arc<SyncPeriodicTasksInner>,
1142}
1143
1144impl Drop for SyncPeriodicTasks
1145{
1146    fn drop(&mut self) 
1147    {
1148        self.inner.clear_global_queue();
1149
1150        let mut threads = self.threads.take().unwrap();
1151
1152        // stop all threads
1153        for thread in threads.iter()
1154        {
1155            thread.stop();
1156            thread.unpark();
1157        }
1158
1159        // interrupt poll
1160        let _ = self.inner.poll_int.aquire().map(|v| v.interrupt_drop());
1161
1162        for _ in 0..5
1163        {
1164            let threads_unwr = 
1165                match Arc::try_unwrap(threads)
1166                {
1167                    Ok(r) => r,
1168                    Err(e) =>
1169                    {
1170                        threads = e;
1171
1172                        std::thread::sleep(Duration::from_millis(500));
1173
1174                        continue;
1175
1176                    }
1177                };
1178
1179            for thread in threads_unwr
1180            {
1181                thread.clean_local_queue();
1182
1183                let Some(thread) = Arc::into_inner(thread)
1184                else
1185                {
1186                    panic!("assertion trap: ~SyncPeriodicTasks, a reference to ThreadHandler left somewhere");
1187                };
1188
1189                let _ = thread.hndl.join();
1190            }
1191
1192            break;
1193        }
1194    }
1195}
1196
1197
1198impl SyncPeriodicTasks
1199{
1200
1201    
1202    /// Creates new instance. An amount of threads allocated for the task executor
1203    /// should be specified. All threads will be started immidiatly. For small
1204    /// tasks one thread will be enough. For a large amount of tasks, especially it
1205    /// tasks are waken up oftenly then at least two threads should be allocated.
1206    /// 
1207    /// # Arguments
1208    /// 
1209    /// * `threads_cnt` - a [NonZeroUsize] amount of threads.
1210    /// 
1211    /// # Returns
1212    /// 
1213    /// The [Result] is returned as alias [TimerResult].
1214    pub 
1215    fn new(threads_cnt: NonZeroUsize) -> TimerResult<Self>
1216    {
1217        let spti = SharedPeriodicTasks::new()?;
1218        let poll_int = spti.timers_poll.get_poll_interruptor();
1219
1220        // wrap into the mutex because this instance is shared 
1221        let spti = Arc::new(Mutex::new(spti));
1222
1223        let task_injector = Arc::new(Injector::<GlobalTasks>::new());
1224
1225        let mut thread_hndls: Vec<Arc<ThreadHandler>> = Vec::with_capacity(threads_cnt.get());
1226
1227        // spawn threads, all spawn threads will be parked by default.
1228        for i in 0..threads_cnt.get()
1229        {
1230            let handler = 
1231                ThreadWorker::new(format!("timer_exec/{}s", i), task_injector.clone(), spti.clone(), poll_int.clone())?;
1232
1233            thread_hndls.push(Arc::new(handler));
1234        }
1235
1236        let thread_hndls = Arc::new(thread_hndls);
1237
1238        // Lock the instance to initialze the `thread_pool` variable.
1239        let spti_lock = spti.lock().unwrap();
1240        spti_lock.thread_pool.get_or_init(|| thread_hndls.clone());
1241
1242        // get a random thread to unpark it and start the process
1243        let thread = 
1244            spti_lock
1245                .thread_pool
1246                .get()
1247                .unwrap()
1248                .get(random_range(0..threads_cnt.get()))
1249                .unwrap()
1250                .clone();
1251
1252        drop(spti_lock);
1253
1254        // unpark thread
1255        thread.unpark();
1256
1257        let inner = 
1258            SyncPeriodicTasksInner
1259            {
1260                poll_int: poll_int,
1261                task_injector: task_injector,
1262            };
1263
1264        return Ok( 
1265            Self 
1266            { 
1267                threads: Some(thread_hndls),
1268                inner: Arc::new(inner),
1269            }
1270        );
1271    }
1272
1273    /// Adds and spawns the task,
1274    /// 
1275    /// # Arguments
1276    /// 
1277    /// * `task_name` - a task name. used only for identification purposes in debug messages.
1278    /// 
1279    /// * `task` - a task which should be executed. It should implenet [PeriodicTask].
1280    /// 
1281    /// * `task_time` - [PeriodicTaskTime] a time when the task must be spawned.
1282    /// 
1283    /// # Returns
1284    /// 
1285    /// The [Result] is returned as alias [TimerResult].
1286    pub 
1287    fn add<T>(&self, task_name: impl Into<String>, task: T, task_time: PeriodicTaskTime) -> TimerResult<PeriodicTaskGuard>
1288    where T: PeriodicTask
1289    {
1290        let task_int: PeriodicTaskHndl = Box::new(task);
1291
1292        let task_name_str: String = task_name.into();
1293
1294        let period_task_guard = 
1295            Arc::new(PeriodicTaskGuardInner::new(task_name_str.clone(), task_int)?);
1296
1297
1298        let period_task_ticket = 
1299            PeriodicTaskTicket::new(task_name_str.clone(), task_time, Arc::downgrade(&period_task_guard));
1300
1301        let (mpsc_send, mpsc_recv) = mpsc::channel();
1302
1303        self.inner.send_global_cmd(GlobalTasks::AddTask(period_task_ticket, Some(mpsc_send)) )?;
1304
1305        let _ = 
1306            mpsc_recv
1307                .recv()
1308                .map_err(|e|
1309                    map_timer_err!(TimerErrorType::ExternalError, "mpsc error: {}", e)
1310                )??;
1311
1312        let ret = 
1313            PeriodicTaskGuard
1314            {
1315                task_name: task_name_str,
1316                guard: Some(period_task_guard),
1317                spt: self.inner.clone()
1318            };
1319
1320        return Ok(ret);
1321    }
1322
1323    /// Checks if any thread have crashed and no longer works.
1324    /// 
1325    /// # Returns
1326    /// 
1327    /// A [Option] is retuerned with the inner data:
1328    /// 
1329    /// * [Option::Some] with the thread name [String] that have quit.
1330    /// 
1331    /// * [Option::None] indicating that everthing is fine.
1332    pub 
1333    fn check_thread_status(&self) -> Option<String>
1334    {
1335        for thread in self.threads.as_ref().unwrap().iter()
1336        {
1337            if let None = thread.thread_flag.upgrade()
1338            {
1339                return Some(thread.hndl.thread().name().unwrap().to_string());
1340            }
1341        }
1342
1343        return None;
1344    }
1345}
1346
1347#[cfg(test)]
1348mod tests
1349{
1350    use std::{sync::mpsc::{self, RecvTimeoutError, Sender}, time::{Duration, Instant}};
1351
1352    use crate::{periodic_task::sync_tasks::{PeriodicTask, PeriodicTaskResult, PeriodicTaskTime, SyncPeriodicTasks}, AbsoluteTime, RelativeTime};
1353
1354    #[derive(Debug)]
1355    struct TaskStruct1
1356    {
1357        a1: u64,
1358        s: Sender<u64>,
1359    }
1360
1361    impl TaskStruct1
1362    {
1363        fn new(a1: u64, s: Sender<u64>) -> Self
1364        {
1365            return Self{ a1: a1, s };
1366        }
1367    }
1368
1369    impl PeriodicTask for TaskStruct1
1370    {
1371        fn exec(&mut self) -> PeriodicTaskResult
1372        {
1373            println!("taskstruct1 val: {}", self.a1);
1374
1375            let _ = self.s.send(self.a1);
1376
1377            return PeriodicTaskResult::Ok;
1378        }
1379    }
1380
1381    #[derive(Debug)]
1382    struct TaskStruct2
1383    {
1384        a1: u64,
1385        s: Sender<u64>,
1386    }
1387
1388    impl TaskStruct2
1389    {
1390        fn new(a1: u64, s: Sender<u64>) -> Self
1391        {
1392            return Self{ a1: a1, s };
1393        }
1394    }
1395
1396    impl PeriodicTask for TaskStruct2
1397    {
1398        fn exec(&mut self) -> PeriodicTaskResult
1399        {
1400            println!("taskstruct2 val: {}", self.a1);
1401
1402            self.s.send(self.a1).unwrap();
1403
1404            return PeriodicTaskResult::TaskReSchedule(PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(2, 0)));
1405        }
1406    }
1407
1408    #[test]
1409    fn test1_absolute_simple()
1410    {
1411        let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1412
1413        let (send, recv) = mpsc::channel::<u64>();
1414
1415        let task1 = TaskStruct1::new(2, send);
1416        let task1_ptt = PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1417        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1418
1419        println!("added");
1420        let val = recv.recv();
1421
1422        println!("{:?}", val);
1423
1424        drop(task1_guard);
1425    }
1426
1427    #[test]
1428    fn test1_relative_simple()
1429    {
1430        let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1431
1432        let (send, recv) = mpsc::channel::<u64>();
1433
1434        let task1 = TaskStruct1::new(2, send);
1435        let task1_ptt = PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1436        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1437        
1438        let mut s = Instant::now();
1439
1440        for i in 0..3
1441        {
1442            let val = recv.recv().unwrap();
1443
1444            let e = s.elapsed();
1445            s = Instant::now();
1446
1447            println!("{}: {:?} {:?} {}", i, val, e, e.as_micros());
1448            
1449            assert!(999000 < e.as_micros() && e.as_micros() < 10001200);
1450            assert_eq!(val, 2);
1451        }
1452
1453        drop(task1_guard);
1454
1455        std::thread::sleep(Duration::from_millis(100));
1456
1457        return;
1458    }
1459    
1460    #[test]
1461    fn test1_relative_resched_to_abs()
1462    {
1463        let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1464
1465        let (send, recv) = mpsc::channel::<u64>();
1466
1467        let task1 = TaskStruct2::new(2, send);
1468        let task1_ptt = PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1469        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1470
1471        let s = Instant::now();
1472        match recv.recv_timeout(Duration::from_millis(1150))
1473        {
1474            Ok(rcv_a) => 
1475            {
1476                let e = s.elapsed();
1477                println!("{:?} {}", e, e.as_micros());
1478                assert_eq!(rcv_a, 2);
1479                assert!(999051 < e.as_micros() && e.as_micros() < 1000551);
1480            },
1481            Err(RecvTimeoutError::Timeout) => 
1482                panic!("tineout"),
1483            Err(e) =>
1484                panic!("{}", e),
1485        }
1486
1487        let s = Instant::now();
1488        match recv.recv_timeout(Duration::from_millis(2100))
1489        {
1490            Ok(rcv_a) => 
1491            {
1492                let e = s.elapsed();
1493                println!("{:?} {}", e, e.as_micros());
1494                assert_eq!(rcv_a, 2);
1495                assert!(1999642 < e.as_micros() && e.as_micros() < 2000342);
1496            },
1497            Err(RecvTimeoutError::Timeout) => 
1498                panic!("tineout"),
1499            Err(e) =>
1500                panic!("{}", e),
1501        }
1502
1503        let s = Instant::now();
1504        match recv.recv_timeout(Duration::from_millis(2100))
1505        {
1506            Ok(rcv_a) => 
1507            {
1508                let e = s.elapsed();
1509                println!("{:?} {}", e, e.as_micros());
1510                assert_eq!(rcv_a, 2);
1511                assert!(1999642 < e.as_micros() && e.as_micros() < 2000342);
1512            },
1513            Err(RecvTimeoutError::Timeout) => 
1514                panic!("tineout"),
1515            Err(e) =>
1516                panic!("{}", e),
1517        }
1518
1519        drop(task1_guard);
1520
1521        std::thread::sleep(Duration::from_millis(100));
1522
1523        return;
1524    }
1525
1526    #[test]
1527    fn test1_relative_simple_resched()
1528    {
1529        let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1530
1531        let (send, recv) = mpsc::channel::<u64>();
1532
1533        let task1 = TaskStruct1::new(2, send);
1534        let task1_ptt = 
1535            PeriodicTaskTime::interval(
1536                RelativeTime::new_time(1, 0)
1537            );
1538        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1539        
1540        let mut s = Instant::now();
1541
1542        for i in 0..3
1543        {
1544            let val = recv.recv().unwrap();
1545
1546            let e = s.elapsed();
1547            s = Instant::now();
1548
1549            println!("{}: {:?} {:?} {}", i, val, e, e.as_micros());
1550            
1551            assert!(999000 < e.as_micros() && e.as_micros() < 10001200);
1552            assert_eq!(val, 2);
1553        }
1554
1555        task1_guard
1556            .reschedule_task(
1557                PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(2, 0))
1558            )
1559            .unwrap();
1560
1561        s = Instant::now();
1562        let val = recv.recv().unwrap();
1563        let e = s.elapsed();
1564
1565        println!("resched: {:?} {:?} {}", val, e, e.as_micros());
1566
1567        assert!(1999000 < e.as_micros() && e.as_micros() < 2000560);
1568
1569        let val = recv.recv_timeout(Duration::from_secs(3));
1570
1571        assert_eq!(val.is_err(), true);
1572        assert_eq!(val.err().unwrap(), RecvTimeoutError::Timeout);
1573
1574        drop(task1_guard);
1575
1576        std::thread::sleep(Duration::from_millis(100));
1577
1578        return;
1579    }
1580
1581    #[test]
1582    fn test1_relative_simple_cancel()
1583    {
1584        let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1585
1586        let (send, recv) = mpsc::channel::<u64>();
1587
1588        let task1 = TaskStruct1::new(0, send.clone());
1589        let task1_ptt = 
1590            PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1591
1592        let task2 = TaskStruct1::new(1, send.clone());
1593        let task2_ptt = 
1594            PeriodicTaskTime::interval(RelativeTime::new_time(2, 0));
1595
1596        let task3 = TaskStruct1::new(2, send);
1597        let task3_ptt = 
1598            PeriodicTaskTime::interval(RelativeTime::new_time(0, 500_000_000));
1599
1600        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1601        let task2_guard = s.add("task2", task2, task2_ptt).unwrap();
1602        let task3_guard = s.add("task3", task3, task3_ptt).unwrap();
1603
1604        let mut a_cnt: [u8; 3] = [0_u8; 3];
1605
1606        let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
1607
1608        while AbsoluteTime::now() < end
1609        {
1610            match recv.recv_timeout(Duration::from_millis(1))
1611            {
1612                Ok(rcv_a) => 
1613                    a_cnt[rcv_a as usize] += 1,
1614                Err(RecvTimeoutError::Timeout) => 
1615                    continue,
1616                Err(e) =>
1617                    panic!("{}", e),
1618            }
1619
1620            
1621        }
1622
1623        assert_eq!(a_cnt[0], 5);
1624        assert_eq!(a_cnt[1], 2);
1625        assert_eq!(a_cnt[2], 10);
1626
1627        // drop task #3
1628        task3_guard.suspend_task().unwrap();
1629
1630
1631        let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
1632
1633        while AbsoluteTime::now() < end
1634        {
1635            match recv.recv_timeout(Duration::from_millis(1))
1636            {
1637                Ok(rcv_a) => 
1638                    a_cnt[rcv_a as usize] += 1,
1639                Err(RecvTimeoutError::Timeout) => 
1640                    continue,
1641                Err(e) =>
1642                    panic!("{}", e),
1643            }
1644
1645            
1646        }
1647
1648        assert_eq!(a_cnt[0] > 5, true);
1649        assert_eq!(a_cnt[1] > 2, true);
1650        assert!((a_cnt[2] == 10 || a_cnt[2] == 11));
1651
1652        drop(task1_guard);
1653        drop(task2_guard);
1654        drop(task3_guard);
1655
1656        let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
1657
1658        while AbsoluteTime::now() < end
1659        {
1660            match recv.recv_timeout(Duration::from_millis(1))
1661            {
1662                Ok(rcv_a) => 
1663                    a_cnt[rcv_a as usize] += 1,
1664                Err(RecvTimeoutError::Timeout) => 
1665                    continue,
1666                Err(_) =>
1667                    break,
1668            }
1669
1670            
1671        }
1672
1673        assert_eq!(AbsoluteTime::now() < end, true);
1674
1675        
1676
1677        return;
1678    }
1679
1680    // freebsd text failed with  src/periodic_task/sync_tasks.rs:1784:9 left: 6
1681    #[test]
1682    fn test2_multithread_1()
1683    {
1684        let s = SyncPeriodicTasks::new(2.try_into().unwrap()).unwrap();
1685
1686        let (send, recv) = mpsc::channel::<u64>();
1687
1688        let task1 = TaskStruct1::new(0, send.clone());
1689        let task1_ptt = 
1690            PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1691
1692        let task2 = TaskStruct1::new(1, send.clone());
1693        let task2_ptt = 
1694            PeriodicTaskTime::interval(RelativeTime::new_time(2, 0));
1695
1696        let task3 = TaskStruct1::new(2, send.clone());
1697        let task3_ptt = 
1698            PeriodicTaskTime::interval(RelativeTime::new_time(0, 500_000_000));
1699
1700        let task4 = TaskStruct1::new(3, send.clone());
1701        let task4_ptt = 
1702            PeriodicTaskTime::interval(RelativeTime::new_time(0, 200_000_000));
1703
1704        let task5 = TaskStruct1::new(4, send.clone());
1705        let task5_ptt = 
1706            PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(5, 0));
1707
1708        let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1709        let task2_guard = s.add("task2", task2, task2_ptt).unwrap();
1710        let task3_guard = s.add("task3", task3, task3_ptt).unwrap();
1711        let task4_guard = s.add("task4", task4, task4_ptt).unwrap();
1712        let task5_guard = s.add("task5", task5, task5_ptt).unwrap();
1713
1714
1715        let mut a_cnt: [u8; 5] = [0_u8; 5];
1716
1717        let end = AbsoluteTime::now() + RelativeTime::new_time(5, 500_000_000);
1718
1719        while AbsoluteTime::now() < end
1720        {
1721            match recv.recv_timeout(Duration::from_millis(1))
1722            {
1723                Ok(rcv_a) => 
1724                    a_cnt[rcv_a as usize] += 1,
1725                Err(RecvTimeoutError::Timeout) => 
1726                    continue,
1727                Err(e) =>
1728                    panic!("{}", e),
1729            }
1730
1731            
1732        }
1733
1734        println!("{:?}", a_cnt);
1735
1736        assert!(a_cnt[0] == 5);
1737        assert!(a_cnt[1] == 2);
1738        assert!((a_cnt[2] == 10 || a_cnt[2] == 11));
1739        assert!(a_cnt[3] == 27);
1740        assert!(a_cnt[4] == 1);
1741
1742        task5_guard.reschedule_task(PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(0, 500_000_000))).unwrap();
1743
1744        let end = AbsoluteTime::now() + RelativeTime::new_time(0, 600_000_000);
1745
1746        while AbsoluteTime::now() < end
1747        {
1748            match recv.recv_timeout(Duration::from_millis(1))
1749            {
1750                Ok(rcv_a) => 
1751                    a_cnt[rcv_a as usize] += 1,
1752                Err(RecvTimeoutError::Timeout) => 
1753                    continue,
1754                Err(e) =>
1755                    panic!("{}", e),
1756            }
1757        }
1758
1759        println!("{:?}", a_cnt);
1760        assert!(a_cnt[4] == 2);
1761
1762        drop(task5_guard);
1763        drop(task4_guard);
1764        drop(task3_guard);
1765        drop(task2_guard);
1766
1767        let end = AbsoluteTime::now() + RelativeTime::new_time(2, 1000);
1768
1769        while AbsoluteTime::now() < end
1770        {
1771            match recv.recv_timeout(Duration::from_millis(1))
1772            {
1773                Ok(rcv_a) => 
1774                    a_cnt[rcv_a as usize] += 1,
1775                Err(RecvTimeoutError::Timeout) => 
1776                    continue,
1777                Err(e) =>
1778                    panic!("{}", e),
1779            }
1780        }
1781
1782
1783        println!("{:?}", a_cnt);
1784        assert_eq!(a_cnt[4], 2);
1785        assert_eq!(a_cnt[0], 8);
1786
1787        drop(task1_guard);
1788
1789        std::thread::sleep(Duration::from_millis(10));
1790        return;
1791    }
1792}