timer_deque_rs/periodic_task/sync_tasks.rs
1/*-
2 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
3 * functionality.
4 *
5 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
6 * 4neko.org alex@4neko.org
7 *
8 * The timer-rs crate can be redistributed and/or modified
9 * under the terms of either of the following licenses:
10 *
11 * 1. the Mozilla Public License Version 2.0 (the “MPL”) OR
12 *
13 * 2. The MIT License (MIT)
14 *
15 * 3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
16 */
17
18use std::
19{
20 cell::OnceCell,
21 cmp,
22 collections::HashMap,
23 fmt,
24 num::NonZeroUsize,
25 os::fd::{AsRawFd, RawFd},
26 sync::
27 {
28 Arc, Mutex, TryLockError, Weak, atomic::{AtomicBool, Ordering}, mpsc::{self}
29 },
30 thread::JoinHandle,
31 time::Duration
32};
33
34use crossbeam_deque::{Injector, Steal};
35use rand::random_range;
36
37use crate::
38{
39 AbsoluteTime,
40 FdTimerCom,
41 RelativeTime,
42 TimerPoll,
43 TimerReadRes,
44 error::{TimerError, TimerErrorType, TimerResult},
45 map_timer_err, timer_err,
46 timer_portable::
47 {
48 PollEventType, PolledTimerFd, TimerExpMode, TimerFlags, TimerType, poll::PollInterrupt, timer::TimerFd
49 }
50};
51
52/// A result of the task execution which is returned by the user task.
53/// See description for the each variant.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum PeriodicTaskResult
56{
57 /// The task exec was ok. No errors.
58 Ok,
59
60 /// Cancels (but not removing) the task due to error or for other reason.
61 /// By returning this result the timer is unset to stop generating events.
62 /// But, if since the `AbortTask` was received and a timer has timed out again
63 /// the event will be called again. It is not necessary to return the `AbortTask` again,
64 /// just return [PeriodicTaskResult::Ok] exiting imidiatly.
65 CancelTask,
66
67 /// A request to reschedule the task to a new time or interval. The `0` argument
68 /// should contain new time. The timer mode can be changed.
69 TaskReSchedule(PeriodicTaskTime)
70}
71
72/// A trait which should be implemented by the instance which will be added as a `task`.
73///
74/// * the `instance` must implement [Send] because it may be moved to other thread.
75///
76/// * it is not necessary that the `instance` to impl [Sync] because the `instance` will never
77/// be executed from two different threads and it is already protected by mutex.
78///
79/// A `task` should never block the thread i.e call some functions which may block for a large
80/// period of time!
81pub trait PeriodicTask: Send + 'static
82{
83 /// A task entry point. The task should return a [PeriodicTaskResult].
84 fn exec(&mut self) -> PeriodicTaskResult;
85}
86
87/// An alias for the boxed value.
88pub type PeriodicTaskHndl = Box<dyn PeriodicTask>;
89
90
91/// A global FIFO commands.
92///
93/// For all variants, the option [Option] [mpsc::Sender] is used for the error feedback and if
94/// it is set to [Option::None] the error will not be reported.
95#[derive(Debug)]
96pub(crate) enum GlobalTasks
97{
98 /// Adds task to the task system. If error happens it will be reported over the `1` and item `0`
99 /// will not be added to the instance.
100 AddTask( String, PeriodicTaskTime, Arc<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
101
102 /// Removes the task from the system.
103 RemoveTask( Arc<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
104
105 /// Changes the timer value.
106 ReschedTask( Weak<PeriodicTaskGuardInner>, PeriodicTaskTime, Option<mpsc::Sender<TimerResult<()>>> ),
107
108 /// Suspends the task but does not remove the task. The task which is planned to suspend, if it is already
109 /// in the exec queue, will be executed.
110 SuspendTask( Weak<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
111
112 /// Resumes the task by enabling timer.
113 ResumeTask( Weak<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
114}
115
116/// A local task queue commands.
117#[derive(Debug)]
118pub enum ThreadTask
119{
120 /// Execute the task.
121 TaskExec( Arc<NewTaskTicket> )
122}
123
124/// A ticket which is assigned to specific task in order to send the same task to the same
125/// thread which has been already assigned. If only one thread is allocated, then this is not
126/// really necessary (will be optimized in future).
127#[derive(Debug)]
128pub struct NewTaskTicket
129{
130 /// A thread handler of the specific thread.
131 task_thread: Arc<ThreadHandler>,
132
133 /// A task. A weak reference is used in order to determine if task was
134 /// removed and the exec is not necessary.
135 ptgi: Weak<PeriodicTaskGuardInner>,
136}
137
138impl NewTaskTicket
139{
140 fn new(task_thread: Arc<ThreadHandler>, ptgi: Weak<PeriodicTaskGuardInner>) -> Self
141 {
142 return
143 Self
144 {
145 task_thread:
146 task_thread,
147 ptgi:
148 ptgi
149 };
150 }
151
152 /// Sends the task on exec to the local queue of the thread.
153 fn send_task(this: Arc<NewTaskTicket>, task_rep_count: u64, thread_hndl_cnt: usize)
154 {
155 let strong_cnt = Arc::strong_count(&this);
156 let thread = this.task_thread.clone();
157
158 for _ in 0..task_rep_count
159 {
160 thread.send_task(this.clone(), strong_cnt < 2 && thread_hndl_cnt > 1);
161 }
162 }
163}
164
165/// The internal structure which is stored in pair with the timer's FD. Contains necessary references
166/// and values.
167#[derive(Debug)]
168pub(crate) struct PeriodicTaskTicket
169{
170 /// A task name.
171 task_name: String,
172
173 sync_timer: PolledTimerFd<TimerFd>,
174
175 /// A time which was set to timer. Needed in case if timer was cancelled ny OS or suspended.
176 ptt: PeriodicTaskTime,
177
178 /// A [Weak] reference to the current [NewTaskTicket] which identifies to which thread the
179 /// task exec was assigned. The [Arc] of [NewTaskTicket] is clonned for each task exec round.
180 /// If is not `upgradable` the task is no logner assigned to thread.
181 weak_ticket: Weak<NewTaskTicket>,
182
183 /// A [Weak] reference to the [PeriodicTaskGuardInner]. If this reference is not `upgradable`
184 /// then the task is no logner valid.
185 ptg: Weak<PeriodicTaskGuardInner>,
186}
187
188impl Ord for PeriodicTaskTicket
189{
190 fn cmp(&self, other: &Self) -> cmp::Ordering
191 {
192 return
193 self
194 .sync_timer
195 .get_inner()
196 .as_raw_fd()
197 .cmp(&other.sync_timer.get_inner().as_raw_fd());
198 }
199}
200
201impl PartialOrd for PeriodicTaskTicket
202{
203 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering>
204 {
205 return Some(self.cmp(other));
206 }
207}
208
209impl PartialEq for PeriodicTaskTicket
210{
211 fn eq(&self, other: &Self) -> bool
212 {
213 return
214 self.task_name == other.task_name &&
215 self.sync_timer.get_inner().as_raw_fd() == other.sync_timer.get_inner().as_raw_fd();
216 }
217}
218
219impl Eq for PeriodicTaskTicket {}
220
221impl PeriodicTaskTicket
222{
223 fn new(task_name: String, ptt: PeriodicTaskTime, ptg: Arc<PeriodicTaskGuardInner>, poll: &TimerPoll) -> TimerResult<Self>
224 {
225 // create timer and add to poll
226 let sync_timer =
227 TimerFd::new(task_name.clone().into(), TimerType::CLOCK_REALTIME,
228 TimerFlags::TFD_CLOEXEC | TimerFlags::TFD_NONBLOCK)
229 .map_err(|e|
230 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot setup timer for task: '{}'", task_name)
231 )
232 .and_then(|timer|
233 poll.add(timer)
234 )?;
235
236 let ptt =
237 Self
238 {
239 task_name: task_name,
240 sync_timer: sync_timer,
241 ptt: ptt,
242 weak_ticket: Weak::new(),
243 ptg: Arc::downgrade(&ptg),
244 };
245
246 ptt.set_timer()?;
247
248 return Ok(ptt);
249 }
250
251 #[inline]
252 fn set_timer(&self) -> TimerResult<()>
253 {
254 return self.get_timer_time().set_timer(self.sync_timer.get_inner());
255 }
256
257 #[inline]
258 fn unset_timer(&self) -> TimerResult<()>
259 {
260 return
261 self
262 .sync_timer
263 .get_inner()
264 .get_timer()
265 .unset_time()
266 .map_err(|e|
267 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "unsetting timer '{}' returned error: {}", self.sync_timer, e)
268 );
269 }
270
271 fn update_task_time(&mut self, ptt_new: PeriodicTaskTime) -> TimerResult<()>
272 {
273 self.ptt = ptt_new;
274
275 return self.set_timer();
276 }
277
278 fn get_task_guard(&self) -> TimerResult<Arc<PeriodicTaskGuardInner>>
279 {
280 return
281 self
282 .ptg
283 .upgrade()
284 .ok_or_else(||
285 map_timer_err!(TimerErrorType::ReferenceGone, "task: '{}' reference to timer has gone",
286 self.task_name)
287 );
288 }
289
290 #[inline]
291 fn get_timer_time(&self) -> &PeriodicTaskTime
292 {
293 return &self.ptt;
294 }
295}
296
297/// An instance which is returned by the [SyncPeriodicTasks::add] whuch guards the
298/// task and allows to control its state. If task is no longer needed the instance
299/// can be dropped and it will be removed from the system.
300#[derive(Debug)]
301pub struct PeriodicTaskGuard
302{
303 /// A task name
304 task_name: String,
305
306 /// A [Arc] to [PeriodicTaskGuardInner] which contains timer and other things.
307 /// Normally this is the base instance i.e reference and if it is dropped
308 /// it indicates to the task `executor` that the instance is no logner valid.
309 /// The autoremoval is perfomed via sending the `command` over the global
310 /// qeueu.
311 ///
312 /// The [Option] is needed to `take` the instance.
313 guard: Option<Arc<PeriodicTaskGuardInner>>,
314
315 /// A clonned instance to the executor spawner which contains the reference
316 /// to channel.
317 spt: Arc<SyncPeriodicTasksInner>
318}
319
320impl Drop for PeriodicTaskGuard
321{
322 fn drop(&mut self)
323 {
324 let guard = self.guard.take().unwrap();
325
326 let _ = self.spt.send_global_cmd(GlobalTasks::RemoveTask(guard, None));
327
328 return;
329 }
330}
331
332impl PeriodicTaskGuard
333{
334 /// Requests the task rescheduling - changine the timer time or mode.
335 /// The `task` is represented by the calling instance.
336 ///
337 /// This function blocks
338 /// the current thread for the maximum (in worst case) 5 seconds which is a
339 /// timeout for feedback reception.
340 ///
341 /// # Arguments
342 ///
343 /// * `ptt` - a new time to be set to timer.
344 ///
345 /// # Returns
346 ///
347 /// A [Result] as alias [TimerResult] is returned.
348
349 /// In case if error is retuned, the operation should be considered as not completed
350 /// correctly and the executor instance is poisoned i.e a bug happened.
351 ///
352 /// The common errors may be retuned:
353 ///
354 /// * [TimerErrorType::MpscTimeout] - a feedback (with the result) reception timeout.
355 /// Probably this is because the task executor is too busy.
356 ///
357 /// * [TimerErrorType::TimerError] - with the timer error.
358 ///
359 /// * [TimerErrorType::NotFound] - if instance was not found in the internal records.
360 /// Should not happen. If appears, then probably this is a bug.
361 pub
362 fn reschedule_task(&self, ptt: PeriodicTaskTime) -> TimerResult<()>
363 {
364 let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
365
366 let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
367
368 self.spt.send_global_cmd(GlobalTasks::ReschedTask(weak_ptgi, ptt, Some(snd)))?;
369
370 return
371 rcv
372 .recv_timeout(Duration::from_secs(10))
373 .map_err(|e|
374 map_timer_err!(TimerErrorType::MpscTimeout, "reschedule_task(), task name: '{}', MPSC rcv timeout error: '{}'",
375 self.task_name, e)
376 )?;
377 }
378
379 /// Requests to suspent the current `task`.
380 ///
381 /// This function blocks
382 /// the current thread for the maximum (in worst case) 5 seconds which is a
383 /// timeout for feedback reception.
384 ///
385 /// # Returns
386 ///
387 /// A [Result] as alias [TimerResult] is returned.
388 ///
389 /// In case if error is retuned, the operation should be considered as not completed
390 /// correctly and the executor instance is poisoned i.e a bug happened.
391 ///
392 /// The common errors may be retuned:
393 ///
394 /// * [TimerErrorType::MpscTimeout] - a feedback (with the result) reception timeout.
395 /// Probably this is because the task executor is too busy.
396 ///
397 /// * [TimerErrorType::TimerError] - with the timer error.
398 pub
399 fn suspend_task(&self) -> TimerResult<()>
400 {
401 let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
402
403 let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
404
405 self.spt.send_global_cmd(GlobalTasks::SuspendTask(weak_ptgi, Some(snd)))?;
406
407 return
408 rcv
409 .recv_timeout(Duration::from_secs(10))
410 .map_err(|e|
411 map_timer_err!(TimerErrorType::MpscTimeout, "suspend_task(), task name: '{}', MPSC rcv timeout error: '{}'",
412 self.task_name, e)
413 )?;
414 }
415
416 /// Requests to resume the task from the suspend state.
417 ///
418 /// This function blocks
419 /// the current thread for the maximum (in worst case) 5 seconds which is a
420 /// timeout for feedback reception.
421 ///
422 /// # Returns
423 ///
424 /// A [Result] as alias [TimerResult] is returned.
425 ///
426 /// In case if error is retuned, the operation should be considered as not completed
427 /// correctly and the executor instance is poisoned i.e a bug happened.
428 ///
429 /// The common errors may be retuned:
430 ///
431 /// * [TimerErrorType::MpscTimeout] - a feedback (with the result) reception timeout.
432 /// Probably this is because the task executor is too busy.
433 ///
434 /// * [TimerErrorType::TimerError] - with the timer error.
435 pub
436 fn resume_task(&self) -> TimerResult<()>
437 {
438 let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
439
440 let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
441
442 self.spt.send_global_cmd(GlobalTasks::ResumeTask(weak_ptgi, Some(snd)))?;
443
444 return
445 rcv
446 .recv_timeout(Duration::from_secs(10))
447 .map_err(|e|
448 map_timer_err!(TimerErrorType::MpscTimeout, "resume_task(), task name: '{}', MPSC rcv timeout error: '{}'",
449 self.task_name, e)
450 )?;
451 }
452}
453
454/// Programs the taks's timer to specific time and mode. This instance is
455/// a wrapper around the [TimerExpMode] as this struct requers the generic to
456/// be specified which defines the type of the time.
457#[derive(Debug, Clone, Copy, PartialEq, Eq)]
458pub enum PeriodicTaskTime
459{
460 /// A provided time is absolute time in future.
461 Absolute(TimerExpMode<AbsoluteTime>),
462
463 /// A provided time is relative to current time.
464 Relative(TimerExpMode<RelativeTime>),
465}
466
467impl From<TimerExpMode<AbsoluteTime>> for PeriodicTaskTime
468{
469 fn from(value: TimerExpMode<AbsoluteTime>) -> Self
470 {
471 return Self::Absolute(value);
472 }
473}
474
475impl From<TimerExpMode<RelativeTime>> for PeriodicTaskTime
476{
477 fn from(value: TimerExpMode<RelativeTime>) -> Self
478 {
479 return Self::Relative(value);
480 }
481}
482
483impl fmt::Display for PeriodicTaskTime
484{
485 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
486 {
487 match self
488 {
489 Self::Absolute(t) =>
490 write!(f, "{}", t),
491 Self::Relative(t) =>
492 write!(f, "{}", t),
493 }
494 }
495}
496
497impl PeriodicTaskTime
498{
499 /// The timer is set to time up once and for the specific `absolute` time in future
500 /// (to current realclock time).
501 #[inline]
502 pub
503 fn exact_time(abs_time: AbsoluteTime) -> Self
504 {
505 return Self::Absolute(TimerExpMode::<AbsoluteTime>::new_oneshot(abs_time));
506 }
507
508 /// The timer is set to time up in the interval for the `relative` time.
509 #[inline]
510 pub
511 fn interval(rel_time: RelativeTime) -> Self
512 {
513 return Self::Relative(TimerExpMode::<RelativeTime>::new_interval(rel_time));
514 }
515
516 /// The timer is set to time up in the interval for the `relative` time with the initial
517 /// delay.
518 ///
519 /// # Arguments
520 ///
521 /// * `start_del_time` - a [RelativeTime] of the initial delay.
522 ///
523 /// * `rel_int_time` - a [RelativeTime] of the interval.
524 #[inline]
525 pub
526 fn interval_with_start_delay(start_del_time: RelativeTime, rel_int_time: RelativeTime) -> Self
527 {
528 return Self::Relative(TimerExpMode::<RelativeTime>::new_interval_with_init_delay(start_del_time, rel_int_time));
529 }
530
531 /// Sets the `timer_fd` instance to the specific value stored in the current instance.
532 fn set_timer(&self, timer_fd: &TimerFd) -> TimerResult<()>
533 {
534 match *self
535 {
536 Self::Absolute(timer_exp_mode) =>
537 return
538 timer_fd
539 .get_timer()
540 .set_time(timer_exp_mode)
541 .map_err(|e|
542 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot set time '{}' for timer: '{}'", timer_exp_mode, timer_fd )
543 ),
544 Self::Relative(timer_exp_mode) =>
545 return
546 timer_fd
547 .get_timer()
548 .set_time(timer_exp_mode)
549 .map_err(|e|
550 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot set time '{}' for timer: '{}'", timer_exp_mode, timer_fd )
551 ),
552 }
553 }
554}
555
556
557/// A instane which contains the task's timer and a task.
558pub(crate) struct PeriodicTaskGuardInner
559{
560 task_name: String,
561
562 /// A task itself. At the moment it is mutexed in order to provide
563 /// the mutability, because the current instance is wrapped into [Arc].
564 /// But, the task is never executed from different threads and mutex
565 /// should be replaced with something that is [Send] and can provide
566 /// mutable reference.
567 task: Mutex<PeriodicTaskHndl>,
568}
569
570impl fmt::Debug for PeriodicTaskGuardInner
571{
572 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
573 {
574 f.debug_struct("PeriodicTaskGuardInner").field("task_name", &self.task_name).finish()
575 }
576}
577
578impl fmt::Display for PeriodicTaskGuardInner
579{
580 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
581 {
582 write!(f, "task name: '{}'", self.task_name)
583 }
584}
585
586impl PeriodicTaskGuardInner
587{
588 fn new(task_name: String, task_inst: PeriodicTaskHndl) -> TimerResult<Self>
589 {
590 return Ok(Self { task_name: task_name, task: Mutex::new(task_inst) });
591 }
592
593
594}
595
596/// Internal structure for every worker thread.
597struct ThreadWorker
598{
599 /// A thread name, not task name.
600 thread_name: String,
601
602 /// An [Injector] queue for the global tasks received from other threads.
603 global_task_injector: Arc<Injector<GlobalTasks>>,
604
605 /// An [Injector] queue for local (specific to current thread) tasks.
606 local_thread_inj: Arc<Injector<ThreadTask>>,
607
608 /// A flag which is used to control the thread's loop. Is set to `false`
609 /// when thread should exit.
610 thread_run_flag: Arc<AtomicBool>,
611
612 /// A shared instance of the [SyncPeriodicTasksInner] which holds all information
613 /// about tasks and FD polling. The thread which aquired the mutex first will
614 /// process the `global_task_injector` queue and poll the timers for the events which
615 /// will be shared over with other threads.
616 spti: Arc<Mutex<SharedPeriodicTasks>>,
617
618 /// A last thread to which the task was assigned.
619 thread_last_id: usize,
620
621 /// A poll interrupt signal sender.
622 poll_int: PollInterrupt
623}
624
625impl ThreadWorker
626{
627 fn new(thread_name: String, global_task_injector: Arc<Injector<GlobalTasks>>,
628 spti: Arc<Mutex<SharedPeriodicTasks>>, poll_int: PollInterrupt) -> TimerResult<ThreadHandler>
629 {
630 let local_thread_inj = Arc::new(Injector::<ThreadTask>::new());
631 let thread_run_flag = Arc::new(AtomicBool::new(true));
632 let thread_run_flag_weak = Arc::downgrade(&thread_run_flag);
633
634 let worker =
635 ThreadWorker
636 {
637 thread_name:
638 thread_name.clone(),
639 global_task_injector:
640 global_task_injector,
641 local_thread_inj:
642 local_thread_inj.clone(),
643 thread_run_flag:
644 thread_run_flag,
645 spti:
646 spti,
647 thread_last_id:
648 0,
649 poll_int:
650 poll_int
651 };
652
653 let thread_hndl =
654 std::thread::Builder::new()
655 .name(thread_name)
656 .spawn(|| worker.worker())
657 .map_err(|e|
658 map_timer_err!(TimerErrorType::SpawnError(e.kind()), "{}", e)
659 )?;
660
661
662 return Ok( ThreadHandler::new(thread_hndl, local_thread_inj, thread_run_flag_weak) );
663 }
664
665
666 fn worker(mut self) -> TimerResult<()>
667 {
668 // force to park to allow the main thread to initialize everything
669 std::thread::park();
670
671 while self.thread_run_flag.load(Ordering::Acquire) == true
672 {
673 // check local queue for the task or token
674 while let Steal::Success(task) = self.local_thread_inj.steal()
675 {
676 match task
677 {
678 ThreadTask::TaskExec(task_exec) =>
679 {
680 let Some(ptgi) = task_exec.ptgi.upgrade()
681 else
682 {
683 // task was removed while was in queue.
684 continue;
685 };
686
687 // call task
688 match ptgi.task.lock().unwrap().exec()
689 {
690 PeriodicTaskResult::Ok =>
691 {},
692 PeriodicTaskResult::CancelTask =>
693 {
694 self
695 .global_task_injector
696 .push(GlobalTasks::SuspendTask(task_exec.ptgi.clone(), None));
697
698 let _ = self.poll_int.aquire().map(|v| v.interrupt_drop());
699 },
700 PeriodicTaskResult::TaskReSchedule(ptt) =>
701 {
702 self
703 .global_task_injector
704 .push(GlobalTasks::ReschedTask(task_exec.ptgi.clone(), ptt, None));
705
706 let _ = self.poll_int.aquire().map(|v| v.interrupt_drop());
707 }
708 }
709
710 drop(task_exec);
711 }
712 }
713 }
714
715 let spti_lock_res = self.spti.try_lock();
716
717 if let Ok(mut task_token) = spti_lock_res
718 {
719 let thread_hndl_cnt = task_token.thread_pool.get().unwrap().len();
720
721 // check global task queue
722 while let Steal::Success(task) = self.global_task_injector.steal()
723 {
724 match task
725 {
726 GlobalTasks::AddTask(task_name, ptt, ptg, opt_err_ret) =>
727 {
728 if task_token.contains_task(&task_name) == true
729 {
730 if let Some(err_ret) = opt_err_ret
731 {
732 let err_msg =
733 map_timer_err!(TimerErrorType::Duplicate,
734 "thread: '{}', task: '{}' already exists", self.thread_name, task_name);
735
736 let _ = err_ret.send(Err(err_msg));
737 }
738
739 continue;
740 }
741
742 let period_task_ticket =
743 PeriodicTaskTicket::new(task_name.clone(), ptt, ptg, &task_token.timers_poll);
744
745 if let Err(e) = period_task_ticket
746 {
747 if let Some(err_ret) = opt_err_ret
748 {
749 let _ = err_ret.send(Err(e));
750 }
751
752 continue;
753 }
754
755 let period_task_ticket = period_task_ticket.unwrap();
756
757 task_token.insert_task(period_task_ticket);
758
759 if let Some(err_ret) = opt_err_ret
760 {
761 let _ = err_ret.send(Ok(()));
762 }
763 },
764 GlobalTasks::RemoveTask(ptg_arc, opt_err_ret) =>
765 {
766 let res = task_token.remove_task(&ptg_arc.task_name);
767
768 if let Err(e) = res
769 {
770 if let Some(err_ret) = opt_err_ret
771 {
772 let err_msg =
773 map_timer_err!(e.get_error_type(),
774 "thread: '{}', {}", self.thread_name, e.get_error_msg());
775
776 let _ = err_ret.send(Err(err_msg));
777 }
778
779 continue;
780 }
781
782 let ptt_ref = res.unwrap();
783
784 //let ptt = ptt_ref.into_inner();
785
786 // stop timer
787 if let Err(e) = ptt_ref.unset_timer()
788 {
789 if let Some(err_ret) = opt_err_ret.as_ref()
790 {
791 let _ = err_ret.send(Err(e));
792 }
793
794 continue;
795 }
796
797 drop(ptt_ref);
798
799 /*if let Some(err_ret) = opt_err_ret
800 {
801 let _ = err_ret.send(Ok(()));
802 }
803
804 if let Some(ptt_ref) = task_token.tasks.remove(&ptg_arc.task_name)
805 {
806
807 }
808 else
809 {
810 if let Some(err_ret) = opt_err_ret
811 {
812 let err_msg =
813 map_timer_err!(TimerErrorType::NotFound,
814 "thread: '{}', task: '{}' was not found", self.thread_name, ptg_arc.task_name);
815
816 let _ = err_ret.send(Err(err_msg));
817 }
818 }*/
819
820 drop(ptg_arc);
821 },
822 GlobalTasks::ReschedTask( ptg_weak, ptt, opt_err_ret ) =>
823 {
824 let Some(ptg_arc) = ptg_weak.upgrade()
825 else
826 {
827 // task was removed while was in queue.
828 continue;
829 };
830
831 // replace the time
832 /*let res_task =
833 task_token
834 .tasks
835 .get(&ptg_arc.task_name)
836 .ok_or_else(||
837 map_timer_err!(TimerErrorType::NotFound,
838 "thread: '{}', task: '{}' was not found", self.thread_name, ptg_arc.task_name)
839 );*/
840
841 let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
842
843 let res =
844 match res_task
845 {
846 Ok(task) =>
847 {
848 // stop timer
849 let _ = task.unset_timer();
850
851 // update timer value and set timer
852 let res = task.update_task_time(ptt);
853
854 if let Err(e) = res
855 {
856 if let Some(err_ret) = opt_err_ret.as_ref()
857 {
858 let _ = err_ret.send(Err(e));
859 }
860
861 continue;
862 }
863
864 Ok(())
865 },
866 Err(err) =>
867 {
868 Err(err)
869 }
870 };
871
872 if let Some(err_ret) = opt_err_ret
873 {
874 let _ = err_ret.send(res);
875 }
876 },
877 GlobalTasks::SuspendTask(ptg_weak, opt_err_ret) =>
878 {
879 let Some(ptg_arc) = ptg_weak.upgrade()
880 else
881 {
882 // task was removed while was in queue.
883 continue;
884 };
885
886 /*let res_task =
887 task_token
888 .tasks
889 .get(&ptg_arc.task_name)
890 .ok_or_else(||
891 map_timer_err!(TimerErrorType::NotFound,
892 "thread: '{}', task: '{}' was not found", self.thread_name, ptg_arc.task_name)
893 );*/
894
895 let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
896
897 if let Err(e) = res_task
898 {
899 if let Some(err_ret) = opt_err_ret.as_ref()
900 {
901 let _ = err_ret.send(Err(e));
902 }
903
904 continue;
905 }
906
907 let task = res_task.unwrap();
908
909 let res = task.unset_timer();
910
911 if let Some(err_ret) = opt_err_ret
912 {
913 let _ = err_ret.send(res);
914 }
915 },
916 GlobalTasks::ResumeTask(ptg_weak, opt_err_ret) =>
917 {
918 let Some(ptg_arc) = ptg_weak.upgrade()
919 else
920 {
921 // task was removed while was in queue.
922 continue;
923 };
924
925 /*let res_task =
926 task_token
927 .tasks
928 .get(&ptg_arc.task_name)
929 .ok_or_else(||
930 map_timer_err!(TimerErrorType::NotFound,
931 "thread: '{}', timer with FD: '{}' was not found", self.thread_name, ptg_arc.task_name)
932 );*/
933
934 let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
935
936 if let Err(e) = res_task
937 {
938 if let Some(err_ret) = opt_err_ret.as_ref()
939 {
940 let _ = err_ret.send(Err(e));
941 }
942
943 continue;
944 }
945
946 let res = res_task.unwrap().set_timer();
947
948 if let Some(err_ret) = opt_err_ret.as_ref()
949 {
950 let _ = err_ret.send(res);
951 }
952 }
953 }
954 }
955
956 // poll
957 let Some(res) =
958 task_token
959 .timers_poll
960 .poll(Some(5000))?
961 else
962 {
963 continue;
964 };
965
966
967 for event in res
968 {
969 match event
970 {
971 PollEventType::TimerRes(timer_fd, timer_res) =>
972 {
973 // resolve fd into task
974 /*let task =
975 task_token
976 .tasks
977 .get(&timer_fd)
978 .ok_or_else(||
979 map_timer_err!(TimerErrorType::NotFound,
980 "thread: '{}', timer with FD: '{}' was not found", self.thread_name, timer_fd)
981 )?;*/
982
983 let (ptg, ptt, ticket_arc_opt, task_name) =
984 {
985 let task =
986 task_token
987 .get_task_by_fd(timer_fd)
988 .map_err(|e|
989 map_timer_err!(e.get_error_type(), "thread: '{}', {}", self.thread_name, e.get_error_msg())
990 )?;
991
992 (task.ptg.clone(), task.ptt.clone(), task.weak_ticket.upgrade(), task.task_name.clone())
993 };
994
995 /*let ptg = task.ptg.clone();
996 let ptt = task.ptt.clone();
997 // check if ticket presents, if not create
998 let ticket_arc_opt = task.weak_ticket.upgrade();
999 let task_name = task.task_name.clone();*/
1000
1001
1002
1003 // check that PeriodicTaskGuard is ok
1004 let Some(ptg_arc) = ptg.upgrade()
1005 else
1006 {
1007 //let task_name = task.task_name.clone();
1008
1009 // if Arc cannot be upgraded, then remove the task as removed
1010 //task_token.tasks.remove(&timer_fd);
1011 let _ = task_token.remove_task(&task_name);
1012
1013 // continue event handling
1014 continue;
1015 };
1016
1017 // check timer result
1018 let overflow_cnt: u64 =
1019 match timer_res
1020 {
1021 TimerReadRes::Ok(overfl) =>
1022 {
1023 overfl
1024 },
1025 TimerReadRes::Cancelled =>
1026 {
1027 // the system time was modified, resched instance
1028
1029 self
1030 .global_task_injector
1031 .push(
1032 GlobalTasks::ReschedTask(ptg.clone(), ptt.clone(), None)
1033 );
1034
1035 // timer was reset continue
1036 continue;
1037 },
1038 TimerReadRes::WouldBlock =>
1039 {
1040 // should not happen, silently ignore or panic?
1041 panic!("assertion trap: timer retuned WouldBlock, {}", ptg_arc);
1042 }
1043 };
1044
1045 // check if ticket presents, if not create
1046 // let ticket_arc_opt = task.weak_ticket.upgrade();
1047
1048
1049 let ticket =
1050 match ticket_arc_opt
1051 {
1052 Some(ticket) =>
1053 ticket,
1054 None =>
1055 {
1056 let task_thread =
1057 {
1058 /*let thread_local_hnd =
1059 task_token
1060 .thread_pool
1061 .get()
1062 .unwrap()[self.thread_last_id]
1063 .get_thread_handler();*/
1064
1065
1066
1067 self.thread_last_id = (self.thread_last_id + 1) % thread_hndl_cnt;
1068
1069 //thread_local_hnd
1070 task_token.clone_thread_handler(self.thread_last_id)
1071 };
1072
1073 let ticket =
1074 Arc::new(NewTaskTicket::new(task_thread, ptg.clone()));
1075
1076 let task =
1077 task_token
1078 .get_task_by_fd(timer_fd)
1079 .map_err(|e|
1080 map_timer_err!(e.get_error_type(), "thread: '{}', {}", self.thread_name, e.get_error_msg())
1081 )?;
1082
1083 task.weak_ticket = Arc::downgrade(&ticket);
1084
1085 ticket
1086 }
1087 };
1088
1089
1090
1091 NewTaskTicket::send_task(ticket, overflow_cnt, thread_hndl_cnt);
1092 },
1093 _ =>
1094 {
1095 // ignore
1096 },
1097 }
1098 } // for
1099 }
1100 else if let Err(TryLockError::WouldBlock) = spti_lock_res
1101 {
1102 if self.thread_run_flag.load(Ordering::Acquire) == false
1103 {
1104 return Ok(());
1105 }
1106
1107 if self.local_thread_inj.is_empty() == true
1108 {
1109 std::thread::park_timeout(Duration::from_secs(2));
1110 }
1111 }
1112
1113 } // while
1114
1115 return Ok(());
1116 }
1117}
1118
1119
1120/// A common instance which contains the specific thread handler and
1121/// a [Weak] reference to the loop controlling flag.
1122#[derive(Debug)]
1123struct ThreadHandler
1124{
1125 /// A thread join handler
1126 hndl: JoinHandle<TimerResult<()>>,
1127
1128 /// A local task injector.
1129 task_injector: Arc<Injector<ThreadTask>>,
1130
1131 /// A flag which controls the thread loop. Initialized as `true`.
1132 thread_flag: Weak<AtomicBool>,
1133}
1134
1135impl ThreadHandler
1136{
1137 fn new(hndl: JoinHandle<TimerResult<()>>, task_injector: Arc<Injector<ThreadTask>>, thread_flag: Weak<AtomicBool>) -> Self
1138 {
1139 return
1140 Self
1141 {
1142 hndl,
1143 task_injector,
1144 thread_flag: thread_flag
1145 };
1146 }
1147
1148 fn stop(&self)
1149 {
1150 if let Some(v) = self.thread_flag.upgrade()
1151 {
1152 v.store(false, Ordering::Release);
1153 }
1154 }
1155
1156 fn unpark(&self)
1157 {
1158 self.hndl.thread().unpark();
1159 }
1160
1161 fn send_task(&self, task: Arc<NewTaskTicket>, unpark: bool)
1162 {
1163 self.task_injector.push(ThreadTask::TaskExec(task));
1164
1165 if unpark == true
1166 {
1167 self.hndl.thread().unpark();
1168 }
1169
1170 return;
1171 }
1172
1173 fn clean_local_queue(&self)
1174 {
1175 while let Steal::Success(_) = self.task_injector.steal() {}
1176
1177 return;
1178 }
1179}
1180
1181/// A instance which contains all information about the threads allocated for the
1182/// task execution, tasks and timer polling. An instance is shared with all threads.
1183#[derive(Debug)]
1184pub struct SharedPeriodicTasks
1185{
1186 /// A list of the threads. A [OnceCell] is used to initialize the field once.
1187 thread_pool: OnceCell<Arc<Vec<Arc<ThreadHandler>>>>,
1188
1189 /// A [HashMap] of the regestered tasks. Each task is mapped to its timer FD.
1190 tasks_by_fd: HashMap<RawFd, PeriodicTaskTicket>,
1191
1192 tasks_name_to_fd: HashMap<String, RawFd>,
1193
1194 /// A timer event poll.
1195 timers_poll: TimerPoll,
1196}
1197
1198
1199impl SharedPeriodicTasks
1200{
1201 fn new() -> TimerResult<Self>
1202 {
1203
1204 return Ok(
1205 Self
1206 {
1207 thread_pool: OnceCell::default(),
1208 tasks_by_fd: HashMap::new(),
1209 tasks_name_to_fd: HashMap::new(),
1210 timers_poll: TimerPoll::new()?
1211 }
1212 );
1213 }
1214
1215 fn get_task_by_fd(&mut self, timer_fd: RawFd) -> TimerResult<&mut PeriodicTaskTicket>
1216 {
1217 return
1218 self
1219 .tasks_by_fd
1220 .get_mut(&timer_fd)
1221 .ok_or_else(||
1222 map_timer_err!(TimerErrorType::NotFound, "task fd: '{}' was found but task was not found",
1223 timer_fd.as_raw_fd())
1224 );
1225 }
1226
1227 fn get_task_by_name(&mut self, task_name: &str) -> TimerResult<&mut PeriodicTaskTicket>
1228 {
1229 let res =
1230 self
1231 .tasks_name_to_fd
1232 .get(task_name)
1233 .ok_or_else(||
1234 map_timer_err!(TimerErrorType::NotFound, "task: '{}' was not found", task_name)
1235 )
1236 .map(|v|
1237 self
1238 .tasks_by_fd
1239 .get_mut(v)
1240 .ok_or_else(||
1241 map_timer_err!(TimerErrorType::NotFound, "task: '{}' fd: '{}' was found but task was not found",
1242 task_name, v)
1243 )
1244 )??;
1245
1246 return Ok(res);
1247 }
1248
1249 fn clone_thread_handler(&self, thread_last_id: usize) -> Arc<ThreadHandler>
1250 {
1251 let thread_local_hnd =
1252 self
1253 .thread_pool
1254 .get()
1255 .unwrap()[thread_last_id]
1256 .clone();
1257
1258 return thread_local_hnd;
1259 }
1260
1261 fn remove_task(&mut self, task_name: &str) -> TimerResult<PeriodicTaskTicket>
1262 {
1263 let Some(task_timer_fd) = self.tasks_name_to_fd.remove(task_name)
1264 else
1265 {
1266 timer_err!(TimerErrorType::NotFound, "task: '{}' was not found", task_name);
1267 };
1268
1269 let Some(ptt) = self.tasks_by_fd.remove(&task_timer_fd)
1270 else
1271 {
1272 timer_err!(TimerErrorType::NotFound, "task: '{}' fd: '{}' was found but task was not found",
1273 task_name, task_timer_fd);
1274 };
1275
1276 return Ok(ptt);
1277 }
1278
1279 fn contains_task(&self, task_name: &str) -> bool
1280 {
1281 return self.tasks_name_to_fd.contains_key(task_name);
1282 }
1283
1284 fn insert_task(&mut self, period_task_ticket: PeriodicTaskTicket)
1285 {
1286 let raw_fd = period_task_ticket.sync_timer.get_inner().as_raw_fd();
1287
1288 self.tasks_name_to_fd.insert(period_task_ticket.task_name.clone(), raw_fd);
1289 self.tasks_by_fd.insert(raw_fd, period_task_ticket);
1290
1291 return;
1292 }
1293}
1294
1295#[derive(Debug)]
1296pub struct SyncPeriodicTasksInner
1297{
1298 /// A [Weak] reference to the [EventFd] of the timer `poll` which is used to interrupt the polling.
1299 poll_int: PollInterrupt,
1300
1301 /// A `global` FIFO which is used to send commands to the task executor.
1302 task_injector: Arc<Injector<GlobalTasks>>,
1303}
1304
1305impl SyncPeriodicTasksInner
1306{
1307 fn send_global_cmd(&self, glob: GlobalTasks) -> TimerResult<()>
1308 {
1309 let poll_int =
1310 self.poll_int.aquire()?;
1311
1312 self.task_injector.push(glob);
1313
1314 poll_int.interrupt_drop()?;
1315
1316 return Ok(());
1317 }
1318
1319 fn clear_global_queue(&self)
1320 {
1321 while let Steal::Success(_) = self.task_injector.steal() {}
1322
1323 return;
1324 }
1325}
1326
1327/// A wrapper for the task which is described in closure.
1328struct SyncCallableOper
1329{
1330 op: Box<dyn FnMut() -> PeriodicTaskResult>,
1331}
1332
1333unsafe impl Send for SyncCallableOper {}
1334
1335impl PeriodicTask for SyncCallableOper
1336{
1337 fn exec(&mut self) -> PeriodicTaskResult
1338 {
1339 return (self.op)();
1340 }
1341}
1342
1343/// A `main` instance which spawns the task executor.
1344///
1345/// ```ignore
1346/// let spt = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1347/// let task1 = TaskStruct1::new(2, send);
1348/// let task1_ptt = PeriodicTaskTime::Relative(TimerExpMode::<RelativeTime>::new_interval(RelativeTime::new_time(1, 0)));
1349/// let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1350/// ```
1351///
1352/// This instance should be kept somewhere and dropped only if the task executor WITH
1353/// all spawned tasks are no longer needed.
1354#[derive(Debug, Clone)]
1355pub struct SyncPeriodicTasks
1356{
1357 threads: Option<Arc<Vec<Arc<ThreadHandler>>>>,
1358
1359 inner: Arc<SyncPeriodicTasksInner>,
1360}
1361
1362impl Drop for SyncPeriodicTasks
1363{
1364 fn drop(&mut self)
1365 {
1366 self.inner.clear_global_queue();
1367
1368 let mut threads = self.threads.take().unwrap();
1369
1370 // stop all threads
1371 for thread in threads.iter()
1372 {
1373 thread.stop();
1374 thread.unpark();
1375 }
1376
1377 // interrupt poll
1378 let _ = self.inner.poll_int.aquire().map(|v| v.interrupt_drop());
1379
1380 for _ in 0..5
1381 {
1382 let threads_unwr =
1383 match Arc::try_unwrap(threads)
1384 {
1385 Ok(r) => r,
1386 Err(e) =>
1387 {
1388 threads = e;
1389
1390 std::thread::sleep(Duration::from_millis(500));
1391
1392 continue;
1393
1394 }
1395 };
1396
1397 for thread in threads_unwr
1398 {
1399 thread.clean_local_queue();
1400
1401 let Some(thread) = Arc::into_inner(thread)
1402 else
1403 {
1404 panic!("assertion trap: ~SyncPeriodicTasks, a reference to ThreadHandler left somewhere");
1405 };
1406
1407 let _ = thread.hndl.join();
1408 }
1409
1410 break;
1411 }
1412 }
1413}
1414
1415
1416impl SyncPeriodicTasks
1417{
1418
1419
1420 /// Creates new instance. An amount of threads allocated for the task executor
1421 /// should be specified. All threads will be started immidiatly. For small
1422 /// tasks one thread will be enough. For a large amount of tasks, especially it
1423 /// tasks are waken up oftenly then at least two threads should be allocated.
1424 ///
1425 /// # Arguments
1426 ///
1427 /// * `threads_cnt` - a [NonZeroUsize] amount of threads.
1428 ///
1429 /// # Returns
1430 ///
1431 /// The [Result] is returned as alias [TimerResult].
1432 pub
1433 fn new(threads_cnt: NonZeroUsize) -> TimerResult<Self>
1434 {
1435 let spti = SharedPeriodicTasks::new()?;
1436 let poll_int = spti.timers_poll.get_poll_interruptor();
1437
1438 // wrap into the mutex because this instance is shared
1439 let spti = Arc::new(Mutex::new(spti));
1440
1441 let task_injector = Arc::new(Injector::<GlobalTasks>::new());
1442
1443 let mut thread_hndls: Vec<Arc<ThreadHandler>> = Vec::with_capacity(threads_cnt.get());
1444
1445 // spawn threads, all spawn threads will be parked by default.
1446 for i in 0..threads_cnt.get()
1447 {
1448 let handler =
1449 ThreadWorker::new(format!("timer_exec/{}s", i), task_injector.clone(), spti.clone(), poll_int.clone())?;
1450
1451 thread_hndls.push(Arc::new(handler));
1452 }
1453
1454 let thread_hndls = Arc::new(thread_hndls);
1455
1456 // Lock the instance to initialze the `thread_pool` variable.
1457 let spti_lock = spti.lock().unwrap();
1458 spti_lock.thread_pool.get_or_init(|| thread_hndls.clone());
1459
1460 // get a random thread to unpark it and start the process
1461 let thread =
1462 spti_lock
1463 .thread_pool
1464 .get()
1465 .unwrap()
1466 .get(random_range(0..threads_cnt.get()))
1467 .unwrap()
1468 .clone();
1469
1470 drop(spti_lock);
1471
1472 // unpark thread
1473 thread.unpark();
1474
1475 let inner =
1476 SyncPeriodicTasksInner
1477 {
1478 poll_int: poll_int,
1479 task_injector: task_injector,
1480 };
1481
1482 return Ok(
1483 Self
1484 {
1485 threads: Some(thread_hndls),
1486 inner: Arc::new(inner),
1487 }
1488 );
1489 }
1490
1491 /// Adds and spawns the task. The task is defined with generic `T` which is any
1492 /// datatype which implements [PeriodicTask].
1493 ///
1494 /// # Arguments
1495 ///
1496 /// * `task_name` - a task name. used only for identification purposes in debug messages.
1497 ///
1498 /// * `task` - a task which should be executed. It should implenet [PeriodicTask].
1499 ///
1500 /// * `task_time` - [PeriodicTaskTime] a time when the task must be spawned.
1501 ///
1502 /// # Returns
1503 ///
1504 /// The [Result] is returned as alias [TimerResult].
1505 ///
1506 /// # Example
1507 ///
1508 /// ```ignore
1509 /// #[derive(Debug)]
1510 /// struct TaskStruct1
1511 /// {
1512 /// a1: u64,
1513 /// s: Sender<u64>,
1514 /// }
1515 ///
1516 /// impl TaskStruct1
1517 /// {
1518 /// fn new(a1: u64, s: Sender<u64>) -> Self
1519 /// {
1520 /// return Self{ a1: a1, s };
1521 /// }
1522 /// }
1523 ///
1524 /// impl PeriodicTask for TaskStruct1
1525 /// {
1526 /// fn exec(&mut self) -> PeriodicTaskResult
1527 /// {
1528 /// println!("taskstruct1 val: {}", self.a1);
1529 /// let _ = self.s.send(self.a1);
1530 /// return PeriodicTaskResult::Ok;
1531 /// }
1532 /// }
1533 ///
1534 /// fn main()
1535 /// {
1536 /// let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1537 ///
1538 /// let (send, recv) = mpsc::channel::<u64>();
1539 ///
1540 /// let task1 = TaskStruct1::new(2, send);
1541 ///
1542 /// let task1_ptt = PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1543 ///
1544 /// let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1545 ///
1546 /// println!("added");
1547 ///
1548 /// let val = recv.recv();
1549 ///
1550 /// println!("{:?}", val);
1551 ///
1552 /// drop(task1_guard);
1553 /// }
1554 /// ```
1555 pub
1556 fn add<T>(&self, task_name: impl Into<String>, task: T, task_time: PeriodicTaskTime) -> TimerResult<PeriodicTaskGuard>
1557 where T: PeriodicTask
1558 {
1559 let task_int: PeriodicTaskHndl = Box::new(task);
1560
1561 let task_name_str: String = task_name.into();
1562
1563 let period_task_guard =
1564 Arc::new(PeriodicTaskGuardInner::new(task_name_str.clone(), task_int)?);
1565
1566
1567 /* let period_task_ticket =
1568 PeriodicTaskTicket::new(task_name_str.clone(), task_time, Arc::downgrade(&period_task_guard));*/
1569
1570 let (mpsc_send, mpsc_recv) = mpsc::channel();
1571
1572 self.inner.send_global_cmd(GlobalTasks::AddTask(task_name_str.clone(), task_time, period_task_guard.clone(), Some(mpsc_send)) )?;
1573
1574 let _ =
1575 mpsc_recv
1576 .recv()
1577 .map_err(|e|
1578 map_timer_err!(TimerErrorType::ExternalError, "mpsc error: {}", e)
1579 )??;
1580
1581 let ret =
1582 PeriodicTaskGuard
1583 {
1584 task_name: task_name_str,
1585 guard: Some(period_task_guard),
1586 spt: self.inner.clone()
1587 };
1588
1589 return Ok(ret);
1590 }
1591
1592 /// Adds and spawns the task in form of a closure.
1593 ///
1594 /// # Arguments
1595 ///
1596 /// * `task_name` - a task name. used only for identification purposes in debug messages.
1597 ///
1598 /// * `task_time` - [PeriodicTaskTime] a time when the task must be spawned.
1599 ///
1600 /// * `clo` - a function to exec on timeout. The function must return [PeriodicTaskResult].
1601 ///
1602 /// # Returns
1603 ///
1604 /// The [Result] is returned as alias [TimerResult].
1605 ///
1606 /// # Example
1607 ///
1608 /// ```ignore
1609 ///
1610 /// let task1_ptt = PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1611 ///
1612 /// let (send, recv) = mpsc::channel::<u64>();
1613 ///
1614 /// let task1_guard =
1615 /// s.add_closure("task2", task1_ptt,
1616 /// move ||
1617 /// {
1618 /// println!("test output");
1619 /// send.send(1).unwrap();
1620 ///
1621 /// return PeriodicTaskResult::Ok;
1622 /// }
1623 /// ).unwrap();
1624 ///
1625 /// let val = recv.recv();
1626 ///
1627 /// println!("{:?}", val);
1628 /// ```
1629 pub
1630 fn add_closure<F>(&self, task_name: impl Into<String>, task_time: PeriodicTaskTime, clo: F) -> TimerResult<PeriodicTaskGuard>
1631 where F: 'static + FnMut() -> PeriodicTaskResult + Send
1632 {
1633 let closure_task = SyncCallableOper{ op: Box::new(clo) };
1634
1635 return self.add(task_name, closure_task, task_time);
1636 }
1637 /// Checks if any thread have crashed and no longer works.
1638 ///
1639 /// # Returns
1640 ///
1641 /// A [Option] is retuerned with the inner data:
1642 ///
1643 /// * [Option::Some] with the thread name [String] that have quit.
1644 ///
1645 /// * [Option::None] indicating that everthing is fine.
1646 pub
1647 fn check_thread_status(&self) -> Option<String>
1648 {
1649 for thread in self.threads.as_ref().unwrap().iter()
1650 {
1651 if let None = thread.thread_flag.upgrade()
1652 {
1653 return Some(thread.hndl.thread().name().unwrap().to_string());
1654 }
1655 }
1656
1657 return None;
1658 }
1659}
1660
1661#[cfg(test)]
1662mod tests
1663{
1664 use core::fmt;
1665 use std::{sync::mpsc::{self, RecvTimeoutError, Sender}, time::{Duration, Instant}};
1666
1667 use crate::{periodic_task::sync_tasks::{PeriodicTask, PeriodicTaskResult, PeriodicTaskTime, SyncPeriodicTasks}, AbsoluteTime, RelativeTime};
1668
1669 #[derive(Debug)]
1670 struct TaskStruct1
1671 {
1672 a1: u64,
1673 s: Sender<u64>,
1674 }
1675
1676 impl TaskStruct1
1677 {
1678 fn new(a1: u64, s: Sender<u64>) -> Self
1679 {
1680 return Self{ a1: a1, s };
1681 }
1682 }
1683
1684 impl PeriodicTask for TaskStruct1
1685 {
1686 fn exec(&mut self) -> PeriodicTaskResult
1687 {
1688 println!("taskstruct1 val: {}", self.a1);
1689
1690 let _ = self.s.send(self.a1);
1691
1692 return PeriodicTaskResult::Ok;
1693 }
1694 }
1695
1696 #[derive(Debug)]
1697 struct TaskStruct2
1698 {
1699 a1: u64,
1700 s: Sender<u64>,
1701 }
1702
1703 impl TaskStruct2
1704 {
1705 fn new(a1: u64, s: Sender<u64>) -> Self
1706 {
1707 return Self{ a1: a1, s };
1708 }
1709 }
1710
1711 impl PeriodicTask for TaskStruct2
1712 {
1713 fn exec(&mut self) -> PeriodicTaskResult
1714 {
1715 println!("taskstruct2 val: {}", self.a1);
1716
1717 self.s.send(self.a1).unwrap();
1718
1719 return PeriodicTaskResult::TaskReSchedule(PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(2, 0)));
1720 }
1721 }
1722
1723 impl<F> PeriodicTask for F
1724 where F: 'static + FnMut() + Send + fmt::Debug
1725 {
1726 fn exec(&mut self) -> PeriodicTaskResult
1727 {
1728 (self)();
1729 return PeriodicTaskResult::Ok;
1730 }
1731 }
1732
1733 #[test]
1734 fn ttt()
1735 {
1736 let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1737
1738 let task1_ptt = PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1739
1740 let (send, recv) = mpsc::channel::<u64>();
1741
1742 let task1_guard =
1743 s.add_closure("task2", task1_ptt,
1744 move ||
1745 {
1746 println!("test output");
1747 send.send(1).unwrap();
1748
1749 return PeriodicTaskResult::Ok;
1750 }
1751 ).unwrap();
1752
1753
1754 println!("added");
1755 let val = recv.recv();
1756
1757 println!("{:?}", val);
1758
1759 drop(task1_guard);
1760 }
1761
1762 #[test]
1763 fn test1_absolute_simple()
1764 {
1765 let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1766
1767 let (send, recv) = mpsc::channel::<u64>();
1768
1769 let task1 = TaskStruct1::new(2, send);
1770 let task1_ptt = PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1771 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1772
1773 println!("added");
1774 let val = recv.recv();
1775
1776 println!("{:?}", val);
1777
1778 drop(task1_guard);
1779 }
1780
1781 #[test]
1782 fn test1_relative_simple()
1783 {
1784 let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1785
1786 let (send, recv) = mpsc::channel::<u64>();
1787
1788 let task1 = TaskStruct1::new(2, send);
1789 let task1_ptt = PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1790 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1791
1792 let mut s = Instant::now();
1793
1794 for i in 0..3
1795 {
1796 let val = recv.recv().unwrap();
1797
1798 let e = s.elapsed();
1799 s = Instant::now();
1800
1801 println!("{}: {:?} {:?} {}", i, val, e, e.as_micros());
1802
1803 assert!(999000 < e.as_micros() && e.as_micros() < 10001200);
1804 assert_eq!(val, 2);
1805 }
1806
1807 drop(task1_guard);
1808
1809 std::thread::sleep(Duration::from_millis(100));
1810
1811 return;
1812 }
1813
1814 #[test]
1815 fn test1_relative_resched_to_abs()
1816 {
1817 let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1818
1819 let (send, recv) = mpsc::channel::<u64>();
1820
1821 let task1 = TaskStruct2::new(2, send);
1822 let task1_ptt = PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1823 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1824
1825 let s = Instant::now();
1826 match recv.recv_timeout(Duration::from_millis(1150))
1827 {
1828 Ok(rcv_a) =>
1829 {
1830 let e = s.elapsed();
1831 println!("{:?} {}", e, e.as_micros());
1832 assert_eq!(rcv_a, 2);
1833 assert!(999051 < e.as_micros() && e.as_micros() < 1000551);
1834 },
1835 Err(RecvTimeoutError::Timeout) =>
1836 panic!("tineout"),
1837 Err(e) =>
1838 panic!("{}", e),
1839 }
1840
1841 let s = Instant::now();
1842 match recv.recv_timeout(Duration::from_millis(2100))
1843 {
1844 Ok(rcv_a) =>
1845 {
1846 let e = s.elapsed();
1847 println!("{:?} {}", e, e.as_micros());
1848 assert_eq!(rcv_a, 2);
1849 assert!(1999642 < e.as_micros() && e.as_micros() < 2000342);
1850 },
1851 Err(RecvTimeoutError::Timeout) =>
1852 panic!("tineout"),
1853 Err(e) =>
1854 panic!("{}", e),
1855 }
1856
1857 let s = Instant::now();
1858 match recv.recv_timeout(Duration::from_millis(2100))
1859 {
1860 Ok(rcv_a) =>
1861 {
1862 let e = s.elapsed();
1863 println!("{:?} {}", e, e.as_micros());
1864 assert_eq!(rcv_a, 2);
1865 assert!(1999642 < e.as_micros() && e.as_micros() < 2000342);
1866 },
1867 Err(RecvTimeoutError::Timeout) =>
1868 panic!("tineout"),
1869 Err(e) =>
1870 panic!("{}", e),
1871 }
1872
1873 drop(task1_guard);
1874
1875 std::thread::sleep(Duration::from_millis(100));
1876
1877 return;
1878 }
1879
1880 #[test]
1881 fn test1_relative_simple_resched()
1882 {
1883 let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1884
1885 let (send, recv) = mpsc::channel::<u64>();
1886
1887 let task1 = TaskStruct1::new(2, send);
1888 let task1_ptt =
1889 PeriodicTaskTime::interval(
1890 RelativeTime::new_time(1, 0)
1891 );
1892 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1893
1894 let mut s = Instant::now();
1895
1896 for i in 0..3
1897 {
1898 let val = recv.recv().unwrap();
1899
1900 let e = s.elapsed();
1901 s = Instant::now();
1902
1903 println!("{}: {:?} {:?} {}", i, val, e, e.as_micros());
1904
1905 assert!(999000 < e.as_micros() && e.as_micros() < 10001200);
1906 assert_eq!(val, 2);
1907 }
1908
1909 task1_guard
1910 .reschedule_task(
1911 PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(2, 0))
1912 )
1913 .unwrap();
1914
1915 s = Instant::now();
1916 let val = recv.recv().unwrap();
1917 let e = s.elapsed();
1918
1919 println!("resched: {:?} {:?} {}", val, e, e.as_micros());
1920
1921 assert!(1999000 < e.as_micros() && e.as_micros() < 2000560);
1922
1923 let val = recv.recv_timeout(Duration::from_secs(3));
1924
1925 assert_eq!(val.is_err(), true);
1926 assert_eq!(val.err().unwrap(), RecvTimeoutError::Timeout);
1927
1928 drop(task1_guard);
1929
1930 std::thread::sleep(Duration::from_millis(100));
1931
1932 return;
1933 }
1934
1935 #[test]
1936 fn test1_relative_simple_cancel()
1937 {
1938 let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1939
1940 let (send, recv) = mpsc::channel::<u64>();
1941
1942 let task1 = TaskStruct1::new(0, send.clone());
1943 let task1_ptt =
1944 PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1945
1946 let task2 = TaskStruct1::new(1, send.clone());
1947 let task2_ptt =
1948 PeriodicTaskTime::interval(RelativeTime::new_time(2, 0));
1949
1950 let task3 = TaskStruct1::new(2, send);
1951 let task3_ptt =
1952 PeriodicTaskTime::interval(RelativeTime::new_time(0, 500_000_000));
1953
1954 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1955 let task2_guard = s.add("task2", task2, task2_ptt).unwrap();
1956 let task3_guard = s.add("task3", task3, task3_ptt).unwrap();
1957
1958 let mut a_cnt: [u8; 3] = [0_u8; 3];
1959
1960 let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
1961
1962 while AbsoluteTime::now() < end
1963 {
1964 match recv.recv_timeout(Duration::from_millis(1))
1965 {
1966 Ok(rcv_a) =>
1967 a_cnt[rcv_a as usize] += 1,
1968 Err(RecvTimeoutError::Timeout) =>
1969 continue,
1970 Err(e) =>
1971 panic!("{}", e),
1972 }
1973
1974
1975 }
1976
1977 assert_eq!(a_cnt[0], 5);
1978 assert_eq!(a_cnt[1], 2);
1979 assert_eq!(a_cnt[2], 10);
1980
1981 // drop task #3
1982 task3_guard.suspend_task().unwrap();
1983
1984
1985 let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
1986
1987 while AbsoluteTime::now() < end
1988 {
1989 match recv.recv_timeout(Duration::from_millis(1))
1990 {
1991 Ok(rcv_a) =>
1992 a_cnt[rcv_a as usize] += 1,
1993 Err(RecvTimeoutError::Timeout) =>
1994 continue,
1995 Err(e) =>
1996 panic!("{}", e),
1997 }
1998
1999
2000 }
2001
2002 assert_eq!(a_cnt[0] > 5, true);
2003 assert_eq!(a_cnt[1] > 2, true);
2004 assert!((a_cnt[2] == 10 || a_cnt[2] == 11));
2005
2006 drop(task1_guard);
2007 drop(task2_guard);
2008 drop(task3_guard);
2009
2010 let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
2011
2012 while AbsoluteTime::now() < end
2013 {
2014 match recv.recv_timeout(Duration::from_millis(1))
2015 {
2016 Ok(rcv_a) =>
2017 a_cnt[rcv_a as usize] += 1,
2018 Err(RecvTimeoutError::Timeout) =>
2019 continue,
2020 Err(_) =>
2021 break,
2022 }
2023
2024
2025 }
2026
2027 assert_eq!(AbsoluteTime::now() < end, true);
2028
2029
2030
2031 return;
2032 }
2033
2034 // freebsd text failed with src/periodic_task/sync_tasks.rs:1784:9 left: 6
2035 #[test]
2036 fn test2_multithread_1()
2037 {
2038 let s = SyncPeriodicTasks::new(2.try_into().unwrap()).unwrap();
2039
2040 let (send, recv) = mpsc::channel::<u64>();
2041
2042 let task1 = TaskStruct1::new(0, send.clone());
2043 let task1_ptt =
2044 PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
2045
2046 let task2 = TaskStruct1::new(1, send.clone());
2047 let task2_ptt =
2048 PeriodicTaskTime::interval(RelativeTime::new_time(2, 0));
2049
2050 let task3 = TaskStruct1::new(2, send.clone());
2051 let task3_ptt =
2052 PeriodicTaskTime::interval(RelativeTime::new_time(0, 500_000_000));
2053
2054 let task4 = TaskStruct1::new(3, send.clone());
2055 let task4_ptt =
2056 PeriodicTaskTime::interval(RelativeTime::new_time(0, 200_000_000));
2057
2058 let task5 = TaskStruct1::new(4, send.clone());
2059 let task5_ptt =
2060 PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(5, 0));
2061
2062 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
2063 let task2_guard = s.add("task2", task2, task2_ptt).unwrap();
2064 let task3_guard = s.add("task3", task3, task3_ptt).unwrap();
2065 let task4_guard = s.add("task4", task4, task4_ptt).unwrap();
2066 let task5_guard = s.add("task5", task5, task5_ptt).unwrap();
2067
2068
2069 let mut a_cnt: [u8; 5] = [0_u8; 5];
2070
2071 let end = AbsoluteTime::now() + RelativeTime::new_time(5, 500_000_000);
2072
2073 while AbsoluteTime::now() < end
2074 {
2075 match recv.recv_timeout(Duration::from_millis(1))
2076 {
2077 Ok(rcv_a) =>
2078 a_cnt[rcv_a as usize] += 1,
2079 Err(RecvTimeoutError::Timeout) =>
2080 continue,
2081 Err(e) =>
2082 panic!("{}", e),
2083 }
2084
2085
2086 }
2087
2088 println!("{:?}", a_cnt);
2089
2090 assert!(a_cnt[0] == 5);
2091 assert!(a_cnt[1] == 2);
2092 assert!((a_cnt[2] == 10 || a_cnt[2] == 11));
2093 assert!(a_cnt[3] == 27);
2094 assert!(a_cnt[4] == 1);
2095
2096 task5_guard.reschedule_task(PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(0, 500_000_000))).unwrap();
2097
2098 let end = AbsoluteTime::now() + RelativeTime::new_time(0, 600_000_000);
2099
2100 while AbsoluteTime::now() < end
2101 {
2102 match recv.recv_timeout(Duration::from_millis(1))
2103 {
2104 Ok(rcv_a) =>
2105 a_cnt[rcv_a as usize] += 1,
2106 Err(RecvTimeoutError::Timeout) =>
2107 continue,
2108 Err(e) =>
2109 panic!("{}", e),
2110 }
2111 }
2112
2113 println!("{:?}", a_cnt);
2114 assert!(a_cnt[4] == 2);
2115
2116 drop(task5_guard);
2117 drop(task4_guard);
2118 drop(task3_guard);
2119 drop(task2_guard);
2120
2121 let end = AbsoluteTime::now() + RelativeTime::new_time(2, 1000);
2122
2123 while AbsoluteTime::now() < end
2124 {
2125 match recv.recv_timeout(Duration::from_millis(1))
2126 {
2127 Ok(rcv_a) =>
2128 a_cnt[rcv_a as usize] += 1,
2129 Err(RecvTimeoutError::Timeout) =>
2130 continue,
2131 Err(e) =>
2132 panic!("{}", e),
2133 }
2134 }
2135
2136
2137 println!("{:?}", a_cnt);
2138 assert_eq!(a_cnt[4], 2);
2139 assert_eq!(a_cnt[0], 8);
2140
2141 drop(task1_guard);
2142
2143 std::thread::sleep(Duration::from_millis(10));
2144 return;
2145 }
2146}