1use std::
19{
20 cell::OnceCell,
21 cmp,
22 collections::HashMap,
23 fmt,
24 num::NonZeroUsize,
25 sync::
26 {
27 Arc, Mutex, TryLockError, Weak, atomic::{AtomicBool, Ordering}, mpsc::{self, Receiver, Sender}
28 },
29 thread::JoinHandle,
30 time::Duration
31};
32
33use crossbeam_deque::{Injector, Steal};
34use rand::random_range;
35
36use crate::
37{
38 AbsoluteTime,
39 FdTimerCom,
40 RelativeTime,
41 TimerPoll,
42 TimerReadRes,
43 error::{TimerError, TimerErrorType, TimerResult},
44 map_timer_err, timer_err,
45 timer_portable::
46 {
47 AsTimerId, PollEventType, PolledTimerFd, TimerExpMode, TimerFlags, TimerId, TimerType, poll::PollInterrupt, timer::TimerFd
48 }
49};
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum PeriodicTaskResult
55{
56 Ok,
58
59 CancelTask,
65
66 TaskReSchedule(PeriodicTaskTime)
69}
70
71pub trait PeriodicTask: Send + 'static
81{
82 fn exec(&mut self) -> PeriodicTaskResult;
84}
85
86pub type PeriodicTaskHndl = Box<dyn PeriodicTask>;
88
89
90#[derive(Debug)]
95pub(crate) enum GlobalTasks
96{
97 AddTask( String, PeriodicTaskTime, Arc<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
100
101 RemoveTask( Arc<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
103
104 ReschedTask( Weak<PeriodicTaskGuardInner>, PeriodicTaskTime, Option<mpsc::Sender<TimerResult<()>>> ),
106
107 SuspendTask( Weak<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
110
111 ResumeTask( Weak<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
113}
114
115#[derive(Debug)]
117pub enum ThreadTask
118{
119 TaskExec( Arc<NewTaskTicket> )
121}
122
123#[derive(Debug)]
127pub struct NewTaskTicket
128{
129 task_thread: Arc<ThreadHandler>,
131
132 ptgi: Weak<PeriodicTaskGuardInner>,
135}
136
137impl NewTaskTicket
138{
139 fn new(task_thread: Arc<ThreadHandler>, ptgi: Weak<PeriodicTaskGuardInner>) -> Self
140 {
141 return
142 Self
143 {
144 task_thread:
145 task_thread,
146 ptgi:
147 ptgi
148 };
149 }
150
151 fn send_task(this: Arc<NewTaskTicket>, task_rep_count: u64, thread_hndl_cnt: usize)
153 {
154 let strong_cnt = Arc::strong_count(&this);
155 let thread = this.task_thread.clone();
156
157 for _ in 0..task_rep_count
158 {
159 thread.send_task(this.clone(), strong_cnt < 2 && thread_hndl_cnt > 1);
160 }
161 }
162}
163
164#[derive(Debug)]
167pub(crate) struct PeriodicTaskTicket
168{
169 task_name: String,
171
172 sync_timer: PolledTimerFd<TimerFd>,
173
174 ptt: PeriodicTaskTime,
176
177 weak_ticket: Weak<NewTaskTicket>,
181
182 ptg: Weak<PeriodicTaskGuardInner>,
185}
186
187impl Ord for PeriodicTaskTicket
188{
189 fn cmp(&self, other: &Self) -> cmp::Ordering
190 {
191 return
192 self
193 .sync_timer
194 .get_inner()
195 .as_timer_id()
196 .cmp(&other.sync_timer.get_inner().as_timer_id());
197 }
198}
199
200impl PartialOrd for PeriodicTaskTicket
201{
202 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering>
203 {
204 return Some(self.cmp(other));
205 }
206}
207
208impl PartialEq for PeriodicTaskTicket
209{
210 fn eq(&self, other: &Self) -> bool
211 {
212 return
213 self.task_name == other.task_name &&
214 self.sync_timer.get_inner().as_timer_id() == other.sync_timer.get_inner().as_timer_id();
215 }
216}
217
218impl Eq for PeriodicTaskTicket {}
219
220impl PeriodicTaskTicket
221{
222 fn new(task_name: String, ptt: PeriodicTaskTime, ptg: Arc<PeriodicTaskGuardInner>, poll: &TimerPoll) -> TimerResult<Self>
223 {
224 let sync_timer =
226 TimerFd::new(task_name.clone().into(), TimerType::CLOCK_REALTIME,
227 TimerFlags::TFD_CLOEXEC | TimerFlags::TFD_NONBLOCK)
228 .map_err(|e|
229 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot setup timer for task: '{}'", task_name)
230 )
231 .and_then(|timer|
232 poll.add(timer)
233 )?;
234
235 let ptt =
236 Self
237 {
238 task_name: task_name,
239 sync_timer: sync_timer,
240 ptt: ptt,
241 weak_ticket: Weak::new(),
242 ptg: Arc::downgrade(&ptg),
243 };
244
245 ptt.set_timer()?;
246
247 return Ok(ptt);
248 }
249
250 #[inline]
251 fn set_timer(&self) -> TimerResult<()>
252 {
253 return self.get_timer_time().set_timer(self.sync_timer.get_inner());
254 }
255
256 #[inline]
257 fn unset_timer(&self) -> TimerResult<()>
258 {
259 return
260 self
261 .sync_timer
262 .get_inner()
263 .get_timer()
264 .unset_time()
265 .map_err(|e|
266 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "unsetting timer '{}' returned error: {}", self.sync_timer, e)
267 );
268 }
269
270 fn update_task_time(&mut self, ptt_new: PeriodicTaskTime) -> TimerResult<()>
271 {
272 self.ptt = ptt_new;
273
274 return self.set_timer();
275 }
276
277 fn get_task_guard(&self) -> TimerResult<Arc<PeriodicTaskGuardInner>>
278 {
279 return
280 self
281 .ptg
282 .upgrade()
283 .ok_or_else(||
284 map_timer_err!(TimerErrorType::ReferenceGone, "task: '{}' reference to timer has gone",
285 self.task_name)
286 );
287 }
288
289 #[inline]
290 fn get_timer_time(&self) -> &PeriodicTaskTime
291 {
292 return &self.ptt;
293 }
294}
295
296#[derive(Debug)]
300pub struct PeriodicTaskGuard
301{
302 task_name: String,
304
305 guard: Option<Arc<PeriodicTaskGuardInner>>,
313
314 spt: Arc<SyncPeriodicTasksInner>
317}
318
319impl Drop for PeriodicTaskGuard
320{
321 fn drop(&mut self)
322 {
323 let guard = self.guard.take().unwrap();
324
325 let _ = self.spt.send_global_cmd(GlobalTasks::RemoveTask(guard, None));
326
327 return;
328 }
329}
330
331impl PeriodicTaskGuard
332{
333 pub
361 fn reschedule_task(&self, ptt: PeriodicTaskTime) -> TimerResult<()>
362 {
363 let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
364
365 let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
366
367 self.spt.send_global_cmd(GlobalTasks::ReschedTask(weak_ptgi, ptt, Some(snd)))?;
368
369 return
370 rcv
371 .recv_timeout(Duration::from_secs(10))
372 .map_err(|e|
373 map_timer_err!(TimerErrorType::MpscTimeout, "reschedule_task(), task name: '{}', MPSC rcv timeout error: '{}'",
374 self.task_name, e)
375 )?;
376 }
377
378 pub
398 fn suspend_task(&self) -> TimerResult<()>
399 {
400 let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
401
402 let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
403
404 self.spt.send_global_cmd(GlobalTasks::SuspendTask(weak_ptgi, Some(snd)))?;
405
406 return
407 rcv
408 .recv_timeout(Duration::from_secs(10))
409 .map_err(|e|
410 map_timer_err!(TimerErrorType::MpscTimeout, "suspend_task(), task name: '{}', MPSC rcv timeout error: '{}'",
411 self.task_name, e)
412 )?;
413 }
414
415 pub
435 fn resume_task(&self) -> TimerResult<()>
436 {
437 let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
438
439 let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
440
441 self.spt.send_global_cmd(GlobalTasks::ResumeTask(weak_ptgi, Some(snd)))?;
442
443 return
444 rcv
445 .recv_timeout(Duration::from_secs(10))
446 .map_err(|e|
447 map_timer_err!(TimerErrorType::MpscTimeout, "resume_task(), task name: '{}', MPSC rcv timeout error: '{}'",
448 self.task_name, e)
449 )?;
450 }
451}
452
453#[derive(Debug, Clone, Copy, PartialEq, Eq)]
457pub enum PeriodicTaskTime
458{
459 Absolute(TimerExpMode<AbsoluteTime>),
461
462 Relative(TimerExpMode<RelativeTime>),
464}
465
466impl From<TimerExpMode<AbsoluteTime>> for PeriodicTaskTime
467{
468 fn from(value: TimerExpMode<AbsoluteTime>) -> Self
469 {
470 return Self::Absolute(value);
471 }
472}
473
474impl From<TimerExpMode<RelativeTime>> for PeriodicTaskTime
475{
476 fn from(value: TimerExpMode<RelativeTime>) -> Self
477 {
478 return Self::Relative(value);
479 }
480}
481
482impl fmt::Display for PeriodicTaskTime
483{
484 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
485 {
486 match self
487 {
488 Self::Absolute(t) =>
489 write!(f, "{}", t),
490 Self::Relative(t) =>
491 write!(f, "{}", t),
492 }
493 }
494}
495
496impl PeriodicTaskTime
497{
498 #[inline]
501 pub
502 fn exact_time(abs_time: AbsoluteTime) -> Self
503 {
504 return Self::Absolute(TimerExpMode::<AbsoluteTime>::new_oneshot(abs_time));
505 }
506
507 #[inline]
509 pub
510 fn interval(rel_time: RelativeTime) -> Self
511 {
512 return Self::Relative(TimerExpMode::<RelativeTime>::new_interval(rel_time));
513 }
514
515 #[inline]
524 pub
525 fn interval_with_start_delay(start_del_time: RelativeTime, rel_int_time: RelativeTime) -> Self
526 {
527 return Self::Relative(TimerExpMode::<RelativeTime>::new_interval_with_init_delay(start_del_time, rel_int_time));
528 }
529
530 fn set_timer(&self, timer_fd: &TimerFd) -> TimerResult<()>
532 {
533 match *self
534 {
535 Self::Absolute(timer_exp_mode) =>
536 return
537 timer_fd
538 .get_timer()
539 .set_time(timer_exp_mode)
540 .map_err(|e|
541 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot set time '{}' for timer: '{}'", timer_exp_mode, timer_fd )
542 ),
543 Self::Relative(timer_exp_mode) =>
544 return
545 timer_fd
546 .get_timer()
547 .set_time(timer_exp_mode)
548 .map_err(|e|
549 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot set time '{}' for timer: '{}'", timer_exp_mode, timer_fd )
550 ),
551 }
552 }
553}
554
555
556pub(crate) struct PeriodicTaskGuardInner
558{
559 task_name: String,
560
561 task: Mutex<PeriodicTaskHndl>,
567}
568
569impl fmt::Debug for PeriodicTaskGuardInner
570{
571 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
572 {
573 f.debug_struct("PeriodicTaskGuardInner").field("task_name", &self.task_name).finish()
574 }
575}
576
577impl fmt::Display for PeriodicTaskGuardInner
578{
579 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
580 {
581 write!(f, "task name: '{}'", self.task_name)
582 }
583}
584
585impl PeriodicTaskGuardInner
586{
587 fn new(task_name: String, task_inst: PeriodicTaskHndl) -> TimerResult<Self>
588 {
589 return Ok(Self { task_name: task_name, task: Mutex::new(task_inst) });
590 }
591
592
593}
594
595struct ThreadWorker
597{
598 thread_name: String,
600
601 global_task_injector: Arc<Injector<GlobalTasks>>,
603
604 local_thread_inj: Arc<Injector<ThreadTask>>,
606
607 thread_run_flag: Arc<AtomicBool>,
610
611 spti: Arc<Mutex<SharedPeriodicTasks>>,
616
617 thread_last_id: usize,
619
620 poll_int: PollInterrupt,
622
623 tx_err: Option<Sender<TimerError>>,
625}
626
627impl ThreadWorker
628{
629 fn new(
630 thread_name: String,
631 global_task_injector: Arc<Injector<GlobalTasks>>,
632 spti: Arc<Mutex<SharedPeriodicTasks>>,
633 poll_int: PollInterrupt,
634 tx_err: Option<Sender<TimerError>>
635 ) -> TimerResult<ThreadHandler>
636 {
637 let local_thread_inj = Arc::new(Injector::<ThreadTask>::new());
638 let thread_run_flag = Arc::new(AtomicBool::new(true));
639 let thread_run_flag_weak = Arc::downgrade(&thread_run_flag);
640
641 let worker =
642 ThreadWorker
643 {
644 thread_name:
645 thread_name.clone(),
646 global_task_injector:
647 global_task_injector,
648 local_thread_inj:
649 local_thread_inj.clone(),
650 thread_run_flag:
651 thread_run_flag,
652 spti:
653 spti,
654 thread_last_id:
655 0,
656 poll_int:
657 poll_int,
658 tx_err:
659 tx_err
660 };
661
662 let thread_hndl =
663 std::thread::Builder::new()
664 .name(thread_name)
665 .spawn(|| worker.worker())
666 .map_err(|e|
667 map_timer_err!(TimerErrorType::SpawnError(e.kind()), "{}", e)
668 )?;
669
670
671 return Ok( ThreadHandler::new(thread_hndl, local_thread_inj, thread_run_flag_weak) );
672 }
673
674
675 fn worker(mut self) -> TimerResult<()>
676 {
677 std::thread::park();
679
680 while self.thread_run_flag.load(Ordering::Acquire) == true
681 {
682 while let Steal::Success(task) = self.local_thread_inj.steal()
684 {
685 match task
686 {
687 ThreadTask::TaskExec(task_exec) =>
688 {
689 let Some(ptgi) = task_exec.ptgi.upgrade()
690 else
691 {
692 continue;
694 };
695
696 match ptgi.task.lock().unwrap().exec()
698 {
699 PeriodicTaskResult::Ok =>
700 {},
701 PeriodicTaskResult::CancelTask =>
702 {
703 self
704 .global_task_injector
705 .push(GlobalTasks::SuspendTask(task_exec.ptgi.clone(), None));
706
707 let _ = self.poll_int.aquire().map(|v| v.interrupt_drop());
708 },
709 PeriodicTaskResult::TaskReSchedule(ptt) =>
710 {
711 self
712 .global_task_injector
713 .push(GlobalTasks::ReschedTask(task_exec.ptgi.clone(), ptt, None));
714
715 let _ = self.poll_int.aquire().map(|v| v.interrupt_drop());
716 }
717 }
718
719 drop(task_exec);
720 }
721 }
722 }
723
724 let spti_lock_res = self.spti.try_lock();
725
726 if let Ok(mut task_token) = spti_lock_res
727 {
728 let thread_hndl_cnt = task_token.thread_pool.get().unwrap().len();
729
730 while let Steal::Success(task) = self.global_task_injector.steal()
732 {
733 match task
734 {
735 GlobalTasks::AddTask(task_name, ptt, ptg, opt_err_ret) =>
736 {
737 if task_token.contains_task(&task_name) == true
738 {
739 if let Some(err_ret) = opt_err_ret
740 {
741 let err_msg =
742 map_timer_err!(TimerErrorType::Duplicate,
743 "thread: '{}', task: '{}' already exists", self.thread_name, task_name);
744
745 let _ = err_ret.send(Err(err_msg));
746 }
747
748 continue;
749 }
750
751 let period_task_ticket =
752 PeriodicTaskTicket::new(task_name.clone(), ptt, ptg, &task_token.timers_poll);
753
754 if let Err(e) = period_task_ticket
755 {
756 if let Some(err_ret) = opt_err_ret
757 {
758 let _ = err_ret.send(Err(e));
759 }
760
761 continue;
762 }
763
764 let period_task_ticket = period_task_ticket.unwrap();
765
766 task_token.insert_task(period_task_ticket);
767
768 if let Some(err_ret) = opt_err_ret
769 {
770 let _ = err_ret.send(Ok(()));
771 }
772 },
773 GlobalTasks::RemoveTask(ptg_arc, opt_err_ret) =>
774 {
775 let res = task_token.remove_task(&ptg_arc.task_name);
776
777 if let Err(e) = res
778 {
779 if let Some(err_ret) = opt_err_ret
780 {
781 let err_msg =
782 map_timer_err!(e.get_error_type(),
783 "thread: '{}', {}", self.thread_name, e.get_error_msg());
784
785 let _ = err_ret.send(Err(err_msg));
786 }
787
788 continue;
789 }
790
791 let ptt_ref = res.unwrap();
792
793 if let Err(e) = ptt_ref.unset_timer()
797 {
798 if let Some(err_ret) = opt_err_ret.as_ref()
799 {
800 let _ = err_ret.send(Err(e));
801 }
802
803 continue;
804 }
805
806 drop(ptt_ref);
807
808 drop(ptg_arc);
830 },
831 GlobalTasks::ReschedTask( ptg_weak, ptt, opt_err_ret ) =>
832 {
833 let Some(ptg_arc) = ptg_weak.upgrade()
834 else
835 {
836 continue;
838 };
839
840 let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
841
842 let res =
843 match res_task
844 {
845 Ok(task) =>
846 {
847 let _ = task.unset_timer();
849
850 let res = task.update_task_time(ptt);
852
853 if let Err(e) = res
854 {
855 if let Some(err_ret) = opt_err_ret.as_ref()
856 {
857 let _ = err_ret.send(Err(e));
858 }
859
860 continue;
861 }
862
863 Ok(())
864 },
865 Err(err) =>
866 {
867 Err(err)
868 }
869 };
870
871 if let Some(err_ret) = opt_err_ret
872 {
873 let _ = err_ret.send(res);
874 }
875 },
876 GlobalTasks::SuspendTask(ptg_weak, opt_err_ret) =>
877 {
878 let Some(ptg_arc) = ptg_weak.upgrade()
879 else
880 {
881 continue;
883 };
884
885 let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
895
896 if let Err(e) = res_task
897 {
898 if let Some(err_ret) = opt_err_ret.as_ref()
899 {
900 let _ = err_ret.send(Err(e));
901 }
902
903 continue;
904 }
905
906 let task = res_task.unwrap();
907
908 let res = task.unset_timer();
909
910 if let Some(err_ret) = opt_err_ret
911 {
912 let _ = err_ret.send(res);
913 }
914 },
915 GlobalTasks::ResumeTask(ptg_weak, opt_err_ret) =>
916 {
917 let Some(ptg_arc) = ptg_weak.upgrade()
918 else
919 {
920 continue;
922 };
923
924 let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
934
935 if let Err(e) = res_task
936 {
937 if let Some(err_ret) = opt_err_ret.as_ref()
938 {
939 let _ = err_ret.send(Err(e));
940 }
941
942 continue;
943 }
944
945 let res = res_task.unwrap().set_timer();
946
947 if let Some(err_ret) = opt_err_ret.as_ref()
948 {
949 let _ = err_ret.send(res);
950 }
951 }
952 }
953 }
954
955 let Some(res) =
957 task_token
958 .timers_poll
959 .poll(Some(5000))?
960 else
961 {
962 continue;
963 };
964
965 for event in res
966 {
967 match event
968 {
969 PollEventType::TimerRes(timer_fd, timer_res) =>
970 {
971 let task_by_id =
972 task_token
973 .get_task_by_timer_id(timer_fd)
974 .map_err(|e|
975 map_timer_err!(e.get_error_type(), "thread: '{}', {}", self.thread_name, e.get_error_msg())
976 )
977 .map(|task|
978 (task.ptg.clone(), task.ptt.clone(), task.weak_ticket.upgrade(), task.task_name.clone())
979 );
980
981 let Ok((ptg, ptt, ticket_arc_opt, task_name)) = task_by_id
982 else
983 {
984 if let Some(tx_err) = self.tx_err.as_ref()
985 {
986 let _ = tx_err.send(task_by_id.err().unwrap());
987 }
988
989 continue;
990 };
991
992
993 let Some(ptg_arc) = ptg.upgrade()
995 else
996 {
997 if let Err(e) = task_token.remove_task(&task_name)
1002 {
1003 if let Some(tx_err) = self.tx_err.as_ref()
1004 {
1005 let _ = tx_err.send(e);
1006 }
1007 }
1008
1009 continue;
1011 };
1012
1013 let overflow_cnt: u64 =
1015 match timer_res
1016 {
1017 TimerReadRes::Ok(overfl) =>
1018 {
1019 overfl
1020 },
1021 TimerReadRes::Cancelled =>
1022 {
1023 self
1026 .global_task_injector
1027 .push(
1028 GlobalTasks::ReschedTask(ptg.clone(), ptt.clone(), None)
1029 );
1030
1031 continue;
1033 },
1034 TimerReadRes::WouldBlock =>
1035 {
1036 panic!("assertion trap: timer retuned WouldBlock, {}", ptg_arc);
1038 }
1039 };
1040
1041 let ticket =
1046 match ticket_arc_opt
1047 {
1048 Some(ticket) =>
1049 ticket,
1050 None =>
1051 {
1052 let task_thread =
1053 {
1054 self.thread_last_id = (self.thread_last_id + 1) % thread_hndl_cnt;
1064
1065 task_token.clone_thread_handler(self.thread_last_id)
1067 };
1068
1069 let ticket =
1070 Arc::new(NewTaskTicket::new(task_thread, ptg.clone()));
1071
1072 let task =
1073 task_token
1074 .get_task_by_timer_id(timer_fd)
1075 .map_err(|e|
1076 map_timer_err!(e.get_error_type(), "thread: '{}', {}", self.thread_name, e.get_error_msg())
1077 )?;
1078
1079 task.weak_ticket = Arc::downgrade(&ticket);
1080
1081 ticket
1082 }
1083 };
1084
1085 NewTaskTicket::send_task(ticket, overflow_cnt, thread_hndl_cnt);
1086 },
1087 _ =>
1088 {
1089 },
1091 }
1092 } }
1094 else if let Err(TryLockError::WouldBlock) = spti_lock_res
1095 {
1096 if self.thread_run_flag.load(Ordering::Acquire) == false
1097 {
1098 return Ok(());
1099 }
1100
1101 if self.local_thread_inj.is_empty() == true
1102 {
1103 std::thread::park_timeout(Duration::from_secs(2));
1104 }
1105 }
1106
1107 } return Ok(());
1110 }
1111}
1112
1113
1114#[derive(Debug)]
1117struct ThreadHandler
1118{
1119 hndl: JoinHandle<TimerResult<()>>,
1121
1122 task_injector: Arc<Injector<ThreadTask>>,
1124
1125 thread_flag: Weak<AtomicBool>,
1127}
1128
1129impl ThreadHandler
1130{
1131 fn new(hndl: JoinHandle<TimerResult<()>>, task_injector: Arc<Injector<ThreadTask>>, thread_flag: Weak<AtomicBool>) -> Self
1132 {
1133 return
1134 Self
1135 {
1136 hndl,
1137 task_injector,
1138 thread_flag: thread_flag
1139 };
1140 }
1141
1142 fn stop(&self)
1143 {
1144 if let Some(v) = self.thread_flag.upgrade()
1145 {
1146 v.store(false, Ordering::Release);
1147 }
1148 }
1149
1150 fn unpark(&self)
1151 {
1152 self.hndl.thread().unpark();
1153 }
1154
1155 fn send_task(&self, task: Arc<NewTaskTicket>, unpark: bool)
1156 {
1157 self.task_injector.push(ThreadTask::TaskExec(task));
1158
1159 if unpark == true
1160 {
1161 self.hndl.thread().unpark();
1162 }
1163
1164 return;
1165 }
1166
1167 fn clean_local_queue(&self)
1168 {
1169 while let Steal::Success(_) = self.task_injector.steal() {}
1170
1171 return;
1172 }
1173}
1174
1175#[derive(Debug)]
1178pub struct SharedPeriodicTasks
1179{
1180 thread_pool: OnceCell<Arc<Vec<Arc<ThreadHandler>>>>,
1182
1183 tasks_by_timer_fd: HashMap<TimerId, PeriodicTaskTicket>,
1185
1186 tasks_name_to_timer_id: HashMap<String, TimerId>,
1188
1189 timers_poll: TimerPoll,
1191}
1192
1193
1194impl SharedPeriodicTasks
1195{
1196 fn new() -> TimerResult<Self>
1197 {
1198
1199 return Ok(
1200 Self
1201 {
1202 thread_pool:
1203 OnceCell::default(),
1204 tasks_by_timer_fd:
1205 HashMap::new(),
1206 tasks_name_to_timer_id:
1207 HashMap::new(),
1208 timers_poll:
1209 TimerPoll::new()?
1210 }
1211 );
1212 }
1213
1214 fn get_task_by_timer_id(&mut self, timer_fd: TimerId) -> TimerResult<&mut PeriodicTaskTicket>
1216 {
1217 return
1218 self
1219 .tasks_by_timer_fd
1220 .get_mut(&timer_fd)
1221 .ok_or_else(||
1222 map_timer_err!(TimerErrorType::NotFound, "task fd: '{}' was found but task was not found",
1223 timer_fd)
1224 );
1225 }
1226
1227 fn get_task_by_name(&mut self, task_name: &str) -> TimerResult<&mut PeriodicTaskTicket>
1232 {
1233 let res =
1234 self
1235 .tasks_name_to_timer_id
1236 .get(task_name)
1237 .ok_or_else(||
1238 map_timer_err!(TimerErrorType::NotFound, "task: '{}' was not found", task_name)
1239 )
1240 .map(|v|
1241 self
1242 .tasks_by_timer_fd
1243 .get_mut(v)
1244 .ok_or_else(||
1245 map_timer_err!(TimerErrorType::NotFound, "task: '{}' fd: '{}' was found but task was not found",
1246 task_name, v)
1247 )
1248 )??;
1249
1250 return Ok(res);
1251 }
1252
1253 fn clone_thread_handler(&self, thread_last_id: usize) -> Arc<ThreadHandler>
1259 {
1260 let thread_local_hnd =
1261 self
1262 .thread_pool
1263 .get()
1264 .unwrap()[thread_last_id]
1265 .clone();
1266
1267 return thread_local_hnd;
1268 }
1269
1270 fn remove_task(&mut self, task_name: &str) -> TimerResult<PeriodicTaskTicket>
1272 {
1273 let Some(task_timer_fd) = self.tasks_name_to_timer_id.remove(task_name)
1274 else
1275 {
1276 timer_err!(TimerErrorType::NotFound, "task: '{}' was not found", task_name);
1277 };
1278
1279 let Some(ptt) = self.tasks_by_timer_fd.remove(&task_timer_fd)
1280 else
1281 {
1282 timer_err!(TimerErrorType::NotFound, "task: '{}' fd: '{}' was found but task was not found",
1283 task_name, task_timer_fd);
1284 };
1285
1286 return Ok(ptt);
1287 }
1288
1289 fn contains_task(&self, task_name: &str) -> bool
1291 {
1292 return self.tasks_name_to_timer_id.contains_key(task_name);
1293 }
1294
1295 fn insert_task(&mut self, period_task_ticket: PeriodicTaskTicket)
1297 {
1298 let timer_id = period_task_ticket.sync_timer.get_inner().as_timer_id();
1299
1300 self.tasks_name_to_timer_id.insert(period_task_ticket.task_name.clone(), timer_id);
1301 self.tasks_by_timer_fd.insert(timer_id, period_task_ticket);
1302
1303 return;
1304 }
1305}
1306
1307#[derive(Debug)]
1309pub struct SyncPeriodicTasksInner
1310{
1311 poll_int: PollInterrupt,
1313
1314 task_injector: Arc<Injector<GlobalTasks>>,
1316
1317 error_journal: Option<Mutex<Receiver<TimerError>>>,
1319}
1320
1321impl SyncPeriodicTasksInner
1322{
1323 fn send_global_cmd(&self, glob: GlobalTasks) -> TimerResult<()>
1324 {
1325 let poll_int =
1326 self.poll_int.aquire()?;
1327
1328 self.task_injector.push(glob);
1329
1330 poll_int.interrupt_drop()?;
1331
1332 return Ok(());
1333 }
1334
1335 fn clear_global_queue(&self)
1336 {
1337 while let Steal::Success(_) = self.task_injector.steal() {}
1338
1339 return;
1340 }
1341}
1342
1343struct SyncCallableOper
1345{
1346 op: Box<dyn FnMut() -> PeriodicTaskResult>,
1347}
1348
1349unsafe impl Send for SyncCallableOper {}
1350
1351impl PeriodicTask for SyncCallableOper
1352{
1353 fn exec(&mut self) -> PeriodicTaskResult
1354 {
1355 return (self.op)();
1356 }
1357}
1358
1359#[derive(Debug, Clone)]
1371pub struct SyncPeriodicTasks
1372{
1373 threads: Option<Arc<Vec<Arc<ThreadHandler>>>>,
1374
1375 inner: Arc<SyncPeriodicTasksInner>,
1376}
1377
1378impl Drop for SyncPeriodicTasks
1379{
1380 fn drop(&mut self)
1381 {
1382 self.inner.clear_global_queue();
1383
1384 let mut threads = self.threads.take().unwrap();
1385
1386 for thread in threads.iter()
1388 {
1389 thread.stop();
1390 thread.unpark();
1391 }
1392
1393 let _ = self.inner.poll_int.aquire().map(|v| v.interrupt_drop());
1395
1396 for _ in 0..5
1397 {
1398 let threads_unwr =
1399 match Arc::try_unwrap(threads)
1400 {
1401 Ok(r) => r,
1402 Err(e) =>
1403 {
1404 threads = e;
1405
1406 std::thread::sleep(Duration::from_millis(500));
1407
1408 continue;
1409
1410 }
1411 };
1412
1413 for thread in threads_unwr
1414 {
1415 thread.clean_local_queue();
1416
1417 let Some(thread) = Arc::into_inner(thread)
1418 else
1419 {
1420 panic!("assertion trap: ~SyncPeriodicTasks, a reference to ThreadHandler left somewhere");
1421 };
1422
1423 let _ = thread.hndl.join();
1424 }
1425
1426 break;
1427 }
1428 }
1429}
1430
1431
1432impl SyncPeriodicTasks
1433{
1434
1435
1436 pub
1449 fn new(threads_cnt: NonZeroUsize, error_report: bool) -> TimerResult<Self>
1450 {
1451 let spti = SharedPeriodicTasks::new()?;
1452 let poll_int = spti.timers_poll.get_poll_interruptor();
1453
1454 let spti = Arc::new(Mutex::new(spti));
1456
1457 let task_injector = Arc::new(Injector::<GlobalTasks>::new());
1458
1459 let mut thread_hndls: Vec<Arc<ThreadHandler>> = Vec::with_capacity(threads_cnt.get());
1460
1461 let err_journal =
1463 if error_report == true
1464 {
1465 Some(mpsc::channel::<TimerError>())
1466 }
1467 else
1468 {
1469 None
1470 };
1471
1472 for i in 0..threads_cnt.get()
1474 {
1475 let handler =
1476 ThreadWorker::new(
1477 format!("timer_exec/{}s", i),
1478 task_injector.clone(),
1479 spti.clone(),
1480 poll_int.clone(),
1481 err_journal.as_ref().map(|c| c.0.clone())
1482 )?;
1483
1484 thread_hndls.push(Arc::new(handler));
1485 }
1486
1487 let thread_hndls = Arc::new(thread_hndls);
1488
1489 let spti_lock = spti.lock().unwrap();
1491 spti_lock.thread_pool.get_or_init(|| thread_hndls.clone());
1492
1493 let thread =
1495 spti_lock
1496 .thread_pool
1497 .get()
1498 .unwrap()
1499 .get(random_range(0..threads_cnt.get()))
1500 .unwrap()
1501 .clone();
1502
1503 drop(spti_lock);
1504
1505 thread.unpark();
1507
1508 let inner =
1509 SyncPeriodicTasksInner
1510 {
1511 poll_int:
1512 poll_int,
1513 task_injector:
1514 task_injector,
1515 error_journal:
1516 err_journal.map(|j| Mutex::new(j.1))
1517 };
1518
1519 return Ok(
1520 Self
1521 {
1522 threads: Some(thread_hndls),
1523 inner: Arc::new(inner),
1524 }
1525 );
1526 }
1527
1528 pub
1593 fn add<T>(&self, task_name: impl Into<String>, task: T, task_time: PeriodicTaskTime) -> TimerResult<PeriodicTaskGuard>
1594 where T: PeriodicTask
1595 {
1596 let task_int: PeriodicTaskHndl = Box::new(task);
1597
1598 let task_name_str: String = task_name.into();
1599
1600 let period_task_guard =
1601 Arc::new(PeriodicTaskGuardInner::new(task_name_str.clone(), task_int)?);
1602
1603
1604 let (mpsc_send, mpsc_recv) = mpsc::channel();
1608
1609 self.inner.send_global_cmd(GlobalTasks::AddTask(task_name_str.clone(), task_time, period_task_guard.clone(), Some(mpsc_send)) )?;
1610
1611 let _ =
1612 mpsc_recv
1613 .recv()
1614 .map_err(|e|
1615 map_timer_err!(TimerErrorType::ExternalError, "mpsc error: {}", e)
1616 )??;
1617
1618 let ret =
1619 PeriodicTaskGuard
1620 {
1621 task_name: task_name_str,
1622 guard: Some(period_task_guard),
1623 spt: self.inner.clone()
1624 };
1625
1626 return Ok(ret);
1627 }
1628
1629 pub
1667 fn add_closure<F>(&self, task_name: impl Into<String>, task_time: PeriodicTaskTime, clo: F) -> TimerResult<PeriodicTaskGuard>
1668 where F: 'static + FnMut() -> PeriodicTaskResult + Send
1669 {
1670 let closure_task = SyncCallableOper{ op: Box::new(clo) };
1671
1672 return self.add(task_name, closure_task, task_time);
1673 }
1674 pub
1684 fn check_thread_status(&self) -> Option<String>
1685 {
1686 for thread in self.threads.as_ref().unwrap().iter()
1687 {
1688 if let None = thread.thread_flag.upgrade()
1689 {
1690 return Some(thread.hndl.thread().name().unwrap().to_string());
1691 }
1692 }
1693
1694 return None;
1695 }
1696
1697 pub
1706 fn read_error(&self) -> Option<TimerError>
1707 {
1708 let Some(rx) = self.inner.error_journal.as_ref()
1709 else { return None };
1710
1711 let Ok(err) =
1712 rx
1713 .lock()
1714 .unwrap_or_else(|e| e.into_inner())
1715 .recv_timeout(Duration::from_secs(0))
1716 else { return None };
1717
1718 return Some(err);
1719 }
1720}
1721
1722#[cfg(test)]
1723mod tests
1724{
1725 use core::fmt;
1726 use std::{sync::mpsc::{self, RecvTimeoutError, Sender}, time::{Duration, Instant}};
1727
1728 use crate::{periodic_task::sync_tasks::{PeriodicTask, PeriodicTaskResult, PeriodicTaskTime, SyncPeriodicTasks}, AbsoluteTime, RelativeTime};
1729
1730 #[derive(Debug)]
1731 struct TaskStruct1
1732 {
1733 a1: u64,
1734 s: Sender<u64>,
1735 }
1736
1737 impl TaskStruct1
1738 {
1739 fn new(a1: u64, s: Sender<u64>) -> Self
1740 {
1741 return Self{ a1: a1, s };
1742 }
1743 }
1744
1745 impl PeriodicTask for TaskStruct1
1746 {
1747 fn exec(&mut self) -> PeriodicTaskResult
1748 {
1749 println!("taskstruct1 val: {}", self.a1);
1750
1751 let _ = self.s.send(self.a1);
1752
1753 return PeriodicTaskResult::Ok;
1754 }
1755 }
1756
1757 #[derive(Debug)]
1758 struct TaskStruct2
1759 {
1760 a1: u64,
1761 s: Sender<u64>,
1762 }
1763
1764 impl TaskStruct2
1765 {
1766 fn new(a1: u64, s: Sender<u64>) -> Self
1767 {
1768 return Self{ a1: a1, s };
1769 }
1770 }
1771
1772 impl PeriodicTask for TaskStruct2
1773 {
1774 fn exec(&mut self) -> PeriodicTaskResult
1775 {
1776 println!("taskstruct2 val: {}", self.a1);
1777
1778 self.s.send(self.a1).unwrap();
1779
1780 return PeriodicTaskResult::TaskReSchedule(PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(2, 0)));
1781 }
1782 }
1783
1784 impl<F> PeriodicTask for F
1785 where F: 'static + FnMut() + Send + fmt::Debug
1786 {
1787 fn exec(&mut self) -> PeriodicTaskResult
1788 {
1789 (self)();
1790 return PeriodicTaskResult::Ok;
1791 }
1792 }
1793
1794 #[test]
1795 fn ttt()
1796 {
1797 let s =
1798 SyncPeriodicTasks::new(1.try_into().unwrap(), true).unwrap();
1799
1800 let task1_ptt =
1801 PeriodicTaskTime
1802 ::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1803
1804 let (send, recv) = mpsc::channel::<u64>();
1805
1806 let task1_guard =
1807 s.add_closure("task2", task1_ptt,
1808 move ||
1809 {
1810 println!("test output");
1811 send.send(2).unwrap();
1812
1813 return PeriodicTaskResult::Ok;
1814 }
1815 ).unwrap();
1816
1817
1818 println!("added");
1819
1820 let val = recv.recv_timeout(Duration::from_millis(4000));
1821
1822 if val.is_err() == true
1823 {
1824 let e = s.read_error();
1825 println!("ERROR, {:?}",e);
1826
1827 assert_eq!(true, false, "{:?}", e);
1828 }
1829
1830 println!("val: {:?}", val);
1831
1832 assert_eq!(Ok(2), val);
1833
1834 drop(task1_guard);
1835 }
1836
1837 #[test]
1838 fn test1_absolute_simple()
1839 {
1840 let s = SyncPeriodicTasks::new(1.try_into().unwrap(), false).unwrap();
1841
1842 let (send, recv) = mpsc::channel::<u64>();
1843
1844 let task1 = TaskStruct1::new(2, send);
1845 let task1_ptt = PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1846 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1847
1848 println!("added");
1849 let val = recv.recv();
1850
1851 println!("{:?}", val);
1852
1853 drop(task1_guard);
1854 }
1855
1856 #[test]
1857 fn test1_relative_simple()
1858 {
1859 let s = SyncPeriodicTasks::new(1.try_into().unwrap(), false).unwrap();
1860
1861 let (send, recv) = mpsc::channel::<u64>();
1862
1863 let task1 = TaskStruct1::new(2, send);
1864 let task1_ptt = PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1865 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1866
1867 let mut s = Instant::now();
1868
1869 for i in 0..3
1870 {
1871 let val = recv.recv().unwrap();
1872
1873 let e = s.elapsed();
1874 s = Instant::now();
1875
1876 println!("{}: {:?} {:?} {}", i, val, e, e.as_micros());
1877
1878 assert!(999000 < e.as_micros() && e.as_micros() < 10001200);
1879 assert_eq!(val, 2);
1880 }
1881
1882 drop(task1_guard);
1883
1884 std::thread::sleep(Duration::from_millis(100));
1885
1886 return;
1887 }
1888
1889 #[test]
1890 fn test1_relative_resched_to_abs()
1891 {
1892 let s = SyncPeriodicTasks::new(1.try_into().unwrap(), false).unwrap();
1893
1894 let (send, recv) = mpsc::channel::<u64>();
1895
1896 let task1 = TaskStruct2::new(2, send);
1897 let task1_ptt = PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1898 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1899
1900 let s = Instant::now();
1901 match recv.recv_timeout(Duration::from_millis(1150))
1902 {
1903 Ok(rcv_a) =>
1904 {
1905 let e = s.elapsed();
1906 println!("{:?} {}", e, e.as_micros());
1907 assert_eq!(rcv_a, 2);
1908 assert!(990051 < e.as_micros() && e.as_micros() < 1020551);
1909 },
1910 Err(RecvTimeoutError::Timeout) =>
1911 panic!("tineout"),
1912 Err(e) =>
1913 panic!("{}", e),
1914 }
1915
1916 let s = Instant::now();
1917 match recv.recv_timeout(Duration::from_millis(2100))
1918 {
1919 Ok(rcv_a) =>
1920 {
1921 let e = s.elapsed();
1922 println!("{:?} {}", e, e.as_micros());
1923 assert_eq!(rcv_a, 2);
1924 assert!(1999642 < e.as_micros() && e.as_micros() < 2008342);
1925 },
1926 Err(RecvTimeoutError::Timeout) =>
1927 panic!("tineout"),
1928 Err(e) =>
1929 panic!("{}", e),
1930 }
1931
1932 let s = Instant::now();
1933 match recv.recv_timeout(Duration::from_millis(2100))
1934 {
1935 Ok(rcv_a) =>
1936 {
1937 let e = s.elapsed();
1938 println!("{:?} {}", e, e.as_micros());
1939 assert_eq!(rcv_a, 2);
1940 assert!(1999642 < e.as_micros() && e.as_micros() < 2003342);
1941 },
1942 Err(RecvTimeoutError::Timeout) =>
1943 panic!("tineout"),
1944 Err(e) =>
1945 panic!("{}", e),
1946 }
1947
1948 drop(task1_guard);
1949
1950 std::thread::sleep(Duration::from_millis(100));
1951
1952 return;
1953 }
1954
1955 #[test]
1956 fn test1_relative_simple_resched()
1957 {
1958 let s = SyncPeriodicTasks::new(1.try_into().unwrap(), false).unwrap();
1959
1960 let (send, recv) = mpsc::channel::<u64>();
1961
1962 let task1 = TaskStruct1::new(2, send);
1963 let task1_ptt =
1964 PeriodicTaskTime::interval(
1965 RelativeTime::new_time(1, 0)
1966 );
1967 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1968
1969 let mut s = Instant::now();
1970
1971 for i in 0..3
1972 {
1973 let val = recv.recv().unwrap();
1974
1975 let e = s.elapsed();
1976 s = Instant::now();
1977
1978 println!("{}: {:?} {:?} {}", i, val, e, e.as_micros());
1979
1980 assert!(990000 < e.as_micros() && e.as_micros() < 10001200);
1981 assert_eq!(val, 2);
1982 }
1983
1984 task1_guard
1985 .reschedule_task(
1986 PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(2, 0))
1987 )
1988 .unwrap();
1989
1990 s = Instant::now();
1991 let val = recv.recv().unwrap();
1992 let e = s.elapsed();
1993
1994 println!("resched: {:?} {:?} {}", val, e, e.as_micros());
1995
1996 assert!(1990000 < e.as_micros() && e.as_micros() < 2003560);
1997
1998 let val = recv.recv_timeout(Duration::from_secs(3));
1999
2000 assert_eq!(val.is_err(), true);
2001 assert_eq!(val.err().unwrap(), RecvTimeoutError::Timeout);
2002
2003 drop(task1_guard);
2004
2005 std::thread::sleep(Duration::from_millis(100));
2006
2007 return;
2008 }
2009
2010 #[test]
2011 fn test1_relative_simple_cancel()
2012 {
2013 let s = SyncPeriodicTasks::new(1.try_into().unwrap(), false).unwrap();
2014
2015 let (send, recv) = mpsc::channel::<u64>();
2016
2017 let task1 = TaskStruct1::new(0, send.clone());
2018 let task1_ptt =
2019 PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
2020
2021 let task2 = TaskStruct1::new(1, send.clone());
2022 let task2_ptt =
2023 PeriodicTaskTime::interval(RelativeTime::new_time(2, 0));
2024
2025 let task3 = TaskStruct1::new(2, send);
2026 let task3_ptt =
2027 PeriodicTaskTime::interval(RelativeTime::new_time(0, 500_000_000));
2028
2029 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
2030 let task2_guard = s.add("task2", task2, task2_ptt).unwrap();
2031 let task3_guard = s.add("task3", task3, task3_ptt).unwrap();
2032
2033 let mut a_cnt: [u8; 3] = [0_u8; 3];
2034
2035 let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
2036
2037 while AbsoluteTime::now() < end
2038 {
2039 match recv.recv_timeout(Duration::from_millis(1))
2040 {
2041 Ok(rcv_a) =>
2042 a_cnt[rcv_a as usize] += 1,
2043 Err(RecvTimeoutError::Timeout) =>
2044 continue,
2045 Err(e) =>
2046 panic!("{}", e),
2047 }
2048
2049
2050 }
2051
2052 assert_eq!(a_cnt[0], 5);
2053 assert_eq!(a_cnt[1], 2);
2054 assert_eq!(a_cnt[2], 10);
2055
2056 task3_guard.suspend_task().unwrap();
2058
2059
2060 let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
2061
2062 while AbsoluteTime::now() < end
2063 {
2064 match recv.recv_timeout(Duration::from_millis(1))
2065 {
2066 Ok(rcv_a) =>
2067 a_cnt[rcv_a as usize] += 1,
2068 Err(RecvTimeoutError::Timeout) =>
2069 continue,
2070 Err(e) =>
2071 panic!("{}", e),
2072 }
2073
2074
2075 }
2076
2077 assert_eq!(a_cnt[0] > 5, true);
2078 assert_eq!(a_cnt[1] > 2, true);
2079 assert!((a_cnt[2] == 10 || a_cnt[2] == 11));
2080
2081 drop(task1_guard);
2082 drop(task2_guard);
2083 drop(task3_guard);
2084
2085 let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
2086
2087 while AbsoluteTime::now() < end
2088 {
2089 match recv.recv_timeout(Duration::from_millis(1))
2090 {
2091 Ok(rcv_a) =>
2092 a_cnt[rcv_a as usize] += 1,
2093 Err(RecvTimeoutError::Timeout) =>
2094 continue,
2095 Err(_) =>
2096 break,
2097 }
2098
2099
2100 }
2101
2102 assert_eq!(AbsoluteTime::now() < end, true);
2103
2104
2105
2106 return;
2107 }
2108
2109 #[test]
2111 fn test2_multithread_1()
2112 {
2113 let s = SyncPeriodicTasks::new(2.try_into().unwrap(), false).unwrap();
2114
2115 let (send, recv) = mpsc::channel::<u64>();
2116
2117 let task1 = TaskStruct1::new(0, send.clone());
2118 let task1_ptt =
2119 PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
2120
2121 let task2 = TaskStruct1::new(1, send.clone());
2122 let task2_ptt =
2123 PeriodicTaskTime::interval(RelativeTime::new_time(2, 0));
2124
2125 let task3 = TaskStruct1::new(2, send.clone());
2126 let task3_ptt =
2127 PeriodicTaskTime::interval(RelativeTime::new_time(0, 500_000_000));
2128
2129 let task4 = TaskStruct1::new(3, send.clone());
2130 let task4_ptt =
2131 PeriodicTaskTime::interval(RelativeTime::new_time(0, 200_000_000));
2132
2133 let task5 = TaskStruct1::new(4, send.clone());
2134 let task5_ptt =
2135 PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(5, 0));
2136
2137 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
2138 let task2_guard = s.add("task2", task2, task2_ptt).unwrap();
2139 let task3_guard = s.add("task3", task3, task3_ptt).unwrap();
2140 let task4_guard = s.add("task4", task4, task4_ptt).unwrap();
2141 let task5_guard = s.add("task5", task5, task5_ptt).unwrap();
2142
2143
2144 let mut a_cnt: [u8; 5] = [0_u8; 5];
2145
2146 let end = AbsoluteTime::now() + RelativeTime::new_time(5, 500_000_000);
2147
2148 while AbsoluteTime::now() < end
2149 {
2150 match recv.recv_timeout(Duration::from_millis(1))
2151 {
2152 Ok(rcv_a) =>
2153 a_cnt[rcv_a as usize] += 1,
2154 Err(RecvTimeoutError::Timeout) =>
2155 continue,
2156 Err(e) =>
2157 panic!("{}", e),
2158 }
2159
2160
2161 }
2162
2163 println!("{:?}", a_cnt);
2164
2165 assert!(a_cnt[0] == 5);
2166 assert!(a_cnt[1] == 2);
2167 assert!((a_cnt[2] == 10 || a_cnt[2] == 11));
2168 assert!(a_cnt[3] == 27);
2169 assert!(a_cnt[4] == 1);
2170
2171 task5_guard.reschedule_task(PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(0, 500_000_000))).unwrap();
2172
2173 let end = AbsoluteTime::now() + RelativeTime::new_time(0, 600_000_000);
2174
2175 while AbsoluteTime::now() < end
2176 {
2177 match recv.recv_timeout(Duration::from_millis(1))
2178 {
2179 Ok(rcv_a) =>
2180 a_cnt[rcv_a as usize] += 1,
2181 Err(RecvTimeoutError::Timeout) =>
2182 continue,
2183 Err(e) =>
2184 panic!("{}", e),
2185 }
2186 }
2187
2188 println!("{:?}", a_cnt);
2189 assert!(a_cnt[4] == 2);
2190
2191 drop(task5_guard);
2192 drop(task4_guard);
2193 drop(task3_guard);
2194 drop(task2_guard);
2195
2196 let end = AbsoluteTime::now() + RelativeTime::new_time(2, 1000);
2197
2198 while AbsoluteTime::now() < end
2199 {
2200 match recv.recv_timeout(Duration::from_millis(1))
2201 {
2202 Ok(rcv_a) =>
2203 a_cnt[rcv_a as usize] += 1,
2204 Err(RecvTimeoutError::Timeout) =>
2205 continue,
2206 Err(e) =>
2207 panic!("{}", e),
2208 }
2209 }
2210
2211
2212 println!("{:?}", a_cnt);
2213 assert_eq!(a_cnt[4], 2);
2214 assert_eq!(a_cnt[0], 8);
2215
2216 drop(task1_guard);
2217
2218 std::thread::sleep(Duration::from_millis(10));
2219 return;
2220 }
2221}