1use std::
19{
20 cell::OnceCell,
21 cmp,
22 collections::HashMap,
23 fmt,
24 num::NonZeroUsize,
25 os::fd::{AsRawFd, RawFd},
26 sync::
27 {
28 Arc, Mutex, TryLockError, Weak, atomic::{AtomicBool, Ordering}, mpsc::{self}
29 },
30 thread::JoinHandle,
31 time::Duration
32};
33
34use crossbeam_deque::{Injector, Steal};
35use rand::random_range;
36
37use crate::
38{
39 AbsoluteTime,
40 FdTimerCom,
41 RelativeTime,
42 TimerPoll,
43 TimerReadRes,
44 error::{TimerError, TimerErrorType, TimerResult},
45 map_timer_err, timer_err,
46 timer_portable::
47 {
48 PollEventType, PolledTimerFd, TimerExpMode, TimerFlags, TimerType, poll::PollInterrupt, timer::TimerFd
49 }
50};
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum PeriodicTaskResult
56{
57 Ok,
59
60 CancelTask,
66
67 TaskReSchedule(PeriodicTaskTime)
70}
71
72pub trait PeriodicTask: Send + fmt::Debug + 'static
82{
83 fn exec(&mut self) -> PeriodicTaskResult;
85}
86
87pub type PeriodicTaskHndl = Box<dyn PeriodicTask>;
89
90
91#[derive(Debug)]
96pub(crate) enum GlobalTasks
97{
98 AddTask( String, PeriodicTaskTime, Arc<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
101
102 RemoveTask( Arc<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
104
105 ReschedTask( Weak<PeriodicTaskGuardInner>, PeriodicTaskTime, Option<mpsc::Sender<TimerResult<()>>> ),
107
108 SuspendTask( Weak<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
111
112 ResumeTask( Weak<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
114}
115
116#[derive(Debug)]
118pub enum ThreadTask
119{
120 TaskExec( Arc<NewTaskTicket> )
122}
123
124#[derive(Debug)]
128pub struct NewTaskTicket
129{
130 task_thread: Arc<ThreadHandler>,
132 ptgi: Weak<PeriodicTaskGuardInner>,
133}
134
135impl NewTaskTicket
136{
137 fn new(task_thread: Arc<ThreadHandler>, ptgi: Weak<PeriodicTaskGuardInner>) -> Self
138 {
139 return
140 Self
141 {
142 task_thread:
143 task_thread,
144 ptgi:
145 ptgi
146 };
147 }
148
149 fn send_task(this: Arc<NewTaskTicket>, task_rep_count: u64, thread_hndl_cnt: usize)
151 {
152 let strong_cnt = Arc::strong_count(&this);
153 let thread = this.task_thread.clone();
154
155 for _ in 0..task_rep_count
156 {
157 thread.send_task(this.clone(), strong_cnt < 2 && thread_hndl_cnt > 1);
158 }
159 }
160}
161
162#[derive(Debug)]
165pub(crate) struct PeriodicTaskTicket
166{
167 task_name: String,
169
170 sync_timer: PolledTimerFd<TimerFd>,
171
172 ptt: PeriodicTaskTime,
174
175 weak_ticket: Weak<NewTaskTicket>,
179
180 ptg: Weak<PeriodicTaskGuardInner>,
183}
184
185impl Ord for PeriodicTaskTicket
186{
187 fn cmp(&self, other: &Self) -> cmp::Ordering
188 {
189 return
190 self
191 .sync_timer
192 .get_inner()
193 .as_raw_fd()
194 .cmp(&other.sync_timer.get_inner().as_raw_fd());
195 }
196}
197
198impl PartialOrd for PeriodicTaskTicket
199{
200 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering>
201 {
202 return Some(self.cmp(other));
203 }
204}
205
206impl PartialEq for PeriodicTaskTicket
207{
208 fn eq(&self, other: &Self) -> bool
209 {
210 return
211 self.task_name == other.task_name &&
212 self.sync_timer.get_inner().as_raw_fd() == other.sync_timer.get_inner().as_raw_fd();
213 }
214}
215
216impl Eq for PeriodicTaskTicket {}
217
218impl PeriodicTaskTicket
219{
220 fn new(task_name: String, ptt: PeriodicTaskTime, ptg: Arc<PeriodicTaskGuardInner>, poll: &TimerPoll) -> TimerResult<Self>
221 {
222 let sync_timer =
224 TimerFd::new(task_name.clone().into(), TimerType::CLOCK_REALTIME,
225 TimerFlags::TFD_CLOEXEC | TimerFlags::TFD_NONBLOCK)
226 .map_err(|e|
227 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot setup timer for task: '{}'", task_name)
228 )
229 .and_then(|timer|
230 poll.add(timer)
231 )?;
232
233 let ptt =
234 Self
235 {
236 task_name: task_name,
237 sync_timer: sync_timer,
238 ptt: ptt,
239 weak_ticket: Weak::new(),
240 ptg: Arc::downgrade(&ptg),
241 };
242
243 ptt.set_timer()?;
244
245 return Ok(ptt);
246 }
247
248 #[inline]
249 fn set_timer(&self) -> TimerResult<()>
250 {
251 return self.get_timer_time().set_timer(self.sync_timer.get_inner());
252 }
253
254 #[inline]
255 fn unset_timer(&self) -> TimerResult<()>
256 {
257 return
258 self
259 .sync_timer
260 .get_inner()
261 .get_timer()
262 .unset_time()
263 .map_err(|e|
264 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "unsetting timer '{}' returned error: {}", self.sync_timer, e)
265 );
266 }
267
268 fn update_task_time(&mut self, ptt_new: PeriodicTaskTime) -> TimerResult<()>
269 {
270 self.ptt = ptt_new;
271
272 return self.set_timer();
273 }
274
275 fn get_task_guard(&self) -> TimerResult<Arc<PeriodicTaskGuardInner>>
276 {
277 return
278 self
279 .ptg
280 .upgrade()
281 .ok_or_else(||
282 map_timer_err!(TimerErrorType::ReferenceGone, "task: '{}' reference to timer has gone",
283 self.task_name)
284 );
285 }
286
287 #[inline]
288 fn get_timer_time(&self) -> &PeriodicTaskTime
289 {
290 return &self.ptt;
291 }
292}
293
294#[derive(Debug)]
298pub struct PeriodicTaskGuard
299{
300 task_name: String,
302
303 guard: Option<Arc<PeriodicTaskGuardInner>>,
311
312 spt: Arc<SyncPeriodicTasksInner>
315}
316
317impl Drop for PeriodicTaskGuard
318{
319 fn drop(&mut self)
320 {
321 let guard = self.guard.take().unwrap();
322
323 let _ = self.spt.send_global_cmd(GlobalTasks::RemoveTask(guard, None));
324
325 return;
326 }
327}
328
329impl PeriodicTaskGuard
330{
331 pub
359 fn reschedule_task(&self, ptt: PeriodicTaskTime) -> TimerResult<()>
360 {
361 let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
362
363 let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
364
365 self.spt.send_global_cmd(GlobalTasks::ReschedTask(weak_ptgi, ptt, Some(snd)))?;
366
367 return
368 rcv
369 .recv_timeout(Duration::from_secs(10))
370 .map_err(|e|
371 map_timer_err!(TimerErrorType::MpscTimeout, "reschedule_task(), task name: '{}', MPSC rcv timeout error: '{}'",
372 self.task_name, e)
373 )?;
374 }
375
376 pub
396 fn suspend_task(&self) -> TimerResult<()>
397 {
398 let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
399
400 let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
401
402 self.spt.send_global_cmd(GlobalTasks::SuspendTask(weak_ptgi, Some(snd)))?;
403
404 return
405 rcv
406 .recv_timeout(Duration::from_secs(10))
407 .map_err(|e|
408 map_timer_err!(TimerErrorType::MpscTimeout, "suspend_task(), task name: '{}', MPSC rcv timeout error: '{}'",
409 self.task_name, e)
410 )?;
411 }
412
413 pub
433 fn resume_task(&self) -> TimerResult<()>
434 {
435 let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
436
437 let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
438
439 self.spt.send_global_cmd(GlobalTasks::ResumeTask(weak_ptgi, Some(snd)))?;
440
441 return
442 rcv
443 .recv_timeout(Duration::from_secs(10))
444 .map_err(|e|
445 map_timer_err!(TimerErrorType::MpscTimeout, "resume_task(), task name: '{}', MPSC rcv timeout error: '{}'",
446 self.task_name, e)
447 )?;
448 }
449}
450
451#[derive(Debug, Clone, Copy, PartialEq, Eq)]
455pub enum PeriodicTaskTime
456{
457 Absolute(TimerExpMode<AbsoluteTime>),
459
460 Relative(TimerExpMode<RelativeTime>),
462}
463
464impl From<TimerExpMode<AbsoluteTime>> for PeriodicTaskTime
465{
466 fn from(value: TimerExpMode<AbsoluteTime>) -> Self
467 {
468 return Self::Absolute(value);
469 }
470}
471
472impl From<TimerExpMode<RelativeTime>> for PeriodicTaskTime
473{
474 fn from(value: TimerExpMode<RelativeTime>) -> Self
475 {
476 return Self::Relative(value);
477 }
478}
479
480impl fmt::Display for PeriodicTaskTime
481{
482 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
483 {
484 match self
485 {
486 Self::Absolute(t) =>
487 write!(f, "{}", t),
488 Self::Relative(t) =>
489 write!(f, "{}", t),
490 }
491 }
492}
493
494impl PeriodicTaskTime
495{
496 #[inline]
499 pub
500 fn exact_time(abs_time: AbsoluteTime) -> Self
501 {
502 return Self::Absolute(TimerExpMode::<AbsoluteTime>::new_oneshot(abs_time));
503 }
504
505 #[inline]
507 pub
508 fn interval(rel_time: RelativeTime) -> Self
509 {
510 return Self::Relative(TimerExpMode::<RelativeTime>::new_interval(rel_time));
511 }
512
513 #[inline]
522 pub
523 fn interval_with_start_delay(start_del_time: RelativeTime, rel_int_time: RelativeTime) -> Self
524 {
525 return Self::Relative(TimerExpMode::<RelativeTime>::new_interval_with_init_delay(start_del_time, rel_int_time));
526 }
527
528 fn set_timer(&self, timer_fd: &TimerFd) -> TimerResult<()>
530 {
531 match *self
532 {
533 Self::Absolute(timer_exp_mode) =>
534 return
535 timer_fd
536 .get_timer()
537 .set_time(timer_exp_mode)
538 .map_err(|e|
539 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot set time '{}' for timer: '{}'", timer_exp_mode, timer_fd )
540 ),
541 Self::Relative(timer_exp_mode) =>
542 return
543 timer_fd
544 .get_timer()
545 .set_time(timer_exp_mode)
546 .map_err(|e|
547 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot set time '{}' for timer: '{}'", timer_exp_mode, timer_fd )
548 ),
549 }
550 }
551}
552
553
554#[derive(Debug)]
556pub(crate) struct PeriodicTaskGuardInner
557{
558 task_name: String,
559
560 task: Mutex<PeriodicTaskHndl>,
566}
567
568impl fmt::Display for PeriodicTaskGuardInner
569{
570 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
571 {
572 write!(f, "task name: '{}'", self.task_name)
573 }
574}
575
576impl PeriodicTaskGuardInner
577{
578 fn new(task_name: String, task_inst: PeriodicTaskHndl) -> TimerResult<Self>
579 {
580 return Ok(Self { task_name: task_name, task: Mutex::new(task_inst) });
581 }
582
583
584}
585
586struct ThreadWorker
588{
589 thread_name: String,
591
592 global_task_injector: Arc<Injector<GlobalTasks>>,
594
595 local_thread_inj: Arc<Injector<ThreadTask>>,
597
598 thread_run_flag: Arc<AtomicBool>,
601
602 spti: Arc<Mutex<SharedPeriodicTasks>>,
607
608 thread_last_id: usize,
610
611 poll_int: PollInterrupt
613}
614
615impl ThreadWorker
616{
617 fn new(thread_name: String, global_task_injector: Arc<Injector<GlobalTasks>>,
618 spti: Arc<Mutex<SharedPeriodicTasks>>, poll_int: PollInterrupt) -> TimerResult<ThreadHandler>
619 {
620 let local_thread_inj = Arc::new(Injector::<ThreadTask>::new());
621 let thread_run_flag = Arc::new(AtomicBool::new(true));
622 let thread_run_flag_weak = Arc::downgrade(&thread_run_flag);
623
624 let worker =
625 ThreadWorker
626 {
627 thread_name:
628 thread_name.clone(),
629 global_task_injector:
630 global_task_injector,
631 local_thread_inj:
632 local_thread_inj.clone(),
633 thread_run_flag:
634 thread_run_flag,
635 spti:
636 spti,
637 thread_last_id:
638 0,
639 poll_int:
640 poll_int
641 };
642
643 let thread_hndl =
644 std::thread::Builder::new()
645 .name(thread_name)
646 .spawn(|| worker.worker())
647 .map_err(|e|
648 map_timer_err!(TimerErrorType::SpawnError(e.kind()), "{}", e)
649 )?;
650
651
652 return Ok( ThreadHandler::new(thread_hndl, local_thread_inj, thread_run_flag_weak) );
653 }
654
655
656 fn worker(mut self) -> TimerResult<()>
657 {
658 std::thread::park();
660
661 while self.thread_run_flag.load(Ordering::Acquire) == true
662 {
663 while let Steal::Success(task) = self.local_thread_inj.steal()
665 {
666 match task
667 {
668 ThreadTask::TaskExec(task_exec) =>
669 {
670 let Some(ptgi) = task_exec.ptgi.upgrade()
671 else
672 {
673 continue;
675 };
676
677 match ptgi.task.lock().unwrap().exec()
679 {
680 PeriodicTaskResult::Ok =>
681 {},
682 PeriodicTaskResult::CancelTask =>
683 {
684 self
685 .global_task_injector
686 .push(GlobalTasks::SuspendTask(task_exec.ptgi.clone(), None));
687
688 let _ = self.poll_int.aquire().map(|v| v.interrupt_drop());
689 },
690 PeriodicTaskResult::TaskReSchedule(ptt) =>
691 {
692 self
693 .global_task_injector
694 .push(GlobalTasks::ReschedTask(task_exec.ptgi.clone(), ptt, None));
695
696 let _ = self.poll_int.aquire().map(|v| v.interrupt_drop());
697 }
698 }
699
700 drop(task_exec);
701 }
702 }
703 }
704
705 let spti_lock_res = self.spti.try_lock();
706
707 if let Ok(mut task_token) = spti_lock_res
708 {
709 let thread_hndl_cnt = task_token.thread_pool.get().unwrap().len();
710
711 while let Steal::Success(task) = self.global_task_injector.steal()
713 {
714 match task
715 {
716 GlobalTasks::AddTask(task_name, ptt, ptg, opt_err_ret) =>
717 {
718 if task_token.contains_task(&task_name) == true
719 {
720 if let Some(err_ret) = opt_err_ret
721 {
722 let err_msg =
723 map_timer_err!(TimerErrorType::Duplicate,
724 "thread: '{}', task: '{}' already exists", self.thread_name, task_name);
725
726 let _ = err_ret.send(Err(err_msg));
727 }
728
729 continue;
730 }
731
732 let period_task_ticket =
733 PeriodicTaskTicket::new(task_name.clone(), ptt, ptg, &task_token.timers_poll);
734
735 if let Err(e) = period_task_ticket
736 {
737 if let Some(err_ret) = opt_err_ret
738 {
739 let _ = err_ret.send(Err(e));
740 }
741
742 continue;
743 }
744
745 let period_task_ticket = period_task_ticket.unwrap();
746
747 task_token.insert_task(period_task_ticket);
748
749 if let Some(err_ret) = opt_err_ret
750 {
751 let _ = err_ret.send(Ok(()));
752 }
753 },
754 GlobalTasks::RemoveTask(ptg_arc, opt_err_ret) =>
755 {
756 let res = task_token.remove_task(&ptg_arc.task_name);
757
758 if let Err(e) = res
759 {
760 if let Some(err_ret) = opt_err_ret
761 {
762 let err_msg =
763 map_timer_err!(e.get_error_type(),
764 "thread: '{}', {}", self.thread_name, e.get_error_msg());
765
766 let _ = err_ret.send(Err(err_msg));
767 }
768
769 continue;
770 }
771
772 let ptt_ref = res.unwrap();
773
774 if let Err(e) = ptt_ref.unset_timer()
778 {
779 if let Some(err_ret) = opt_err_ret.as_ref()
780 {
781 let _ = err_ret.send(Err(e));
782 }
783
784 continue;
785 }
786
787 drop(ptt_ref);
788
789 drop(ptg_arc);
811 },
812 GlobalTasks::ReschedTask( ptg_weak, ptt, opt_err_ret ) =>
813 {
814 let Some(ptg_arc) = ptg_weak.upgrade()
815 else
816 {
817 continue;
819 };
820
821 let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
832
833 let res =
834 match res_task
835 {
836 Ok(task) =>
837 {
838 let _ = task.unset_timer();
840
841 let res = task.update_task_time(ptt);
843
844 if let Err(e) = res
845 {
846 if let Some(err_ret) = opt_err_ret.as_ref()
847 {
848 let _ = err_ret.send(Err(e));
849 }
850
851 continue;
852 }
853
854 Ok(())
855 },
856 Err(err) =>
857 {
858 Err(err)
859 }
860 };
861
862 if let Some(err_ret) = opt_err_ret
863 {
864 let _ = err_ret.send(res);
865 }
866 },
867 GlobalTasks::SuspendTask(ptg_weak, opt_err_ret) =>
868 {
869 let Some(ptg_arc) = ptg_weak.upgrade()
870 else
871 {
872 continue;
874 };
875
876 let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
886
887 if let Err(e) = res_task
888 {
889 if let Some(err_ret) = opt_err_ret.as_ref()
890 {
891 let _ = err_ret.send(Err(e));
892 }
893
894 continue;
895 }
896
897 let task = res_task.unwrap();
898
899 let res = task.unset_timer();
900
901 if let Some(err_ret) = opt_err_ret
902 {
903 let _ = err_ret.send(res);
904 }
905 },
906 GlobalTasks::ResumeTask(ptg_weak, opt_err_ret) =>
907 {
908 let Some(ptg_arc) = ptg_weak.upgrade()
909 else
910 {
911 continue;
913 };
914
915 let res_task = task_token.get_task_by_name(&ptg_arc.task_name);
925
926 if let Err(e) = res_task
927 {
928 if let Some(err_ret) = opt_err_ret.as_ref()
929 {
930 let _ = err_ret.send(Err(e));
931 }
932
933 continue;
934 }
935
936 let res = res_task.unwrap().set_timer();
937
938 if let Some(err_ret) = opt_err_ret.as_ref()
939 {
940 let _ = err_ret.send(res);
941 }
942 }
943 }
944 }
945
946 let Some(res) =
948 task_token
949 .timers_poll
950 .poll(Some(5000))?
951 else
952 {
953 continue;
954 };
955
956
957 for event in res
958 {
959 match event
960 {
961 PollEventType::TimerRes(timer_fd, timer_res) =>
962 {
963 let (ptg, ptt, ticket_arc_opt, task_name) =
974 {
975 let task =
976 task_token
977 .get_task_by_fd(timer_fd)
978 .map_err(|e|
979 map_timer_err!(e.get_error_type(), "thread: '{}', {}", self.thread_name, e.get_error_msg())
980 )?;
981
982 (task.ptg.clone(), task.ptt.clone(), task.weak_ticket.upgrade(), task.task_name.clone())
983 };
984
985 let Some(ptg_arc) = ptg.upgrade()
995 else
996 {
997 let _ = task_token.remove_task(&task_name);
1002
1003 continue;
1005 };
1006
1007 let overflow_cnt: u64 =
1009 match timer_res
1010 {
1011 TimerReadRes::Ok(overfl) =>
1012 {
1013 overfl
1014 },
1015 TimerReadRes::Cancelled =>
1016 {
1017 self
1020 .global_task_injector
1021 .push(
1022 GlobalTasks::ReschedTask(ptg.clone(), ptt.clone(), None)
1023 );
1024
1025 continue;
1027 },
1028 TimerReadRes::WouldBlock =>
1029 {
1030 panic!("assertion trap: timer retuned WouldBlock, {}", ptg_arc);
1032 }
1033 };
1034
1035 let ticket =
1040 match ticket_arc_opt
1041 {
1042 Some(ticket) =>
1043 ticket,
1044 None =>
1045 {
1046 let task_thread =
1047 {
1048 self.thread_last_id = (self.thread_last_id + 1) % thread_hndl_cnt;
1058
1059 task_token.clone_thread_handler(self.thread_last_id)
1061 };
1062
1063 let ticket =
1064 {
1065 Arc::new(
1067 NewTaskTicket::new(task_thread, ptg.clone())
1068 )
1069 };
1070
1071 let task =
1072 task_token
1073 .get_task_by_fd(timer_fd)
1074 .map_err(|e|
1075 map_timer_err!(e.get_error_type(), "thread: '{}', {}", self.thread_name, e.get_error_msg())
1076 )?;
1077
1078 task.weak_ticket = Arc::downgrade(&ticket);
1079
1080 ticket
1081 }
1082 };
1083
1084
1085
1086 NewTaskTicket::send_task(ticket, overflow_cnt, thread_hndl_cnt);
1087 },
1088 _ =>
1089 {
1090 },
1092 }
1093 } }
1095 else if let Err(TryLockError::WouldBlock) = spti_lock_res
1096 {
1097 if self.thread_run_flag.load(Ordering::Acquire) == false
1098 {
1099 return Ok(());
1100 }
1101
1102 if self.local_thread_inj.is_empty() == true
1103 {
1104 std::thread::park_timeout(Duration::from_secs(2));
1105 }
1106 }
1107
1108 } return Ok(());
1111 }
1112}
1113
1114
1115#[derive(Debug)]
1118struct ThreadHandler
1119{
1120 hndl: JoinHandle<TimerResult<()>>,
1122
1123 task_injector: Arc<Injector<ThreadTask>>,
1125
1126 thread_flag: Weak<AtomicBool>,
1128}
1129
1130impl ThreadHandler
1131{
1132 fn new(hndl: JoinHandle<TimerResult<()>>, task_injector: Arc<Injector<ThreadTask>>, thread_flag: Weak<AtomicBool>) -> Self
1133 {
1134 return
1135 Self
1136 {
1137 hndl,
1138 task_injector,
1139 thread_flag: thread_flag
1140 };
1141 }
1142
1143 fn stop(&self)
1144 {
1145 if let Some(v) = self.thread_flag.upgrade()
1146 {
1147 v.store(false, Ordering::Release);
1148 }
1149 }
1150
1151 fn unpark(&self)
1152 {
1153 self.hndl.thread().unpark();
1154 }
1155
1156 fn send_task(&self, task: Arc<NewTaskTicket>, unpark: bool)
1157 {
1158 self.task_injector.push(ThreadTask::TaskExec(task));
1159
1160 if unpark == true
1161 {
1162 self.hndl.thread().unpark();
1163 }
1164
1165 return;
1166 }
1167
1168 fn clean_local_queue(&self)
1169 {
1170 while let Steal::Success(_) = self.task_injector.steal() {}
1171
1172 return;
1173 }
1174}
1175
1176#[derive(Debug)]
1179pub struct SharedPeriodicTasks
1180{
1181 thread_pool: OnceCell<Arc<Vec<Arc<ThreadHandler>>>>,
1183
1184 tasks_by_fd: HashMap<RawFd, PeriodicTaskTicket>,
1186
1187 tasks_name_to_fd: HashMap<String, RawFd>,
1188
1189 timers_poll: TimerPoll,
1191}
1192
1193
1194impl SharedPeriodicTasks
1195{
1196 fn new() -> TimerResult<Self>
1197 {
1198
1199 return Ok(
1200 Self
1201 {
1202 thread_pool: OnceCell::default(),
1203 tasks_by_fd: HashMap::new(),
1204 tasks_name_to_fd: HashMap::new(),
1205 timers_poll: TimerPoll::new()?
1206 }
1207 );
1208 }
1209
1210 fn get_task_by_fd(&mut self, timer_fd: RawFd) -> TimerResult<&mut PeriodicTaskTicket>
1211 {
1212 return
1213 self
1214 .tasks_by_fd
1215 .get_mut(&timer_fd)
1216 .ok_or_else(||
1217 map_timer_err!(TimerErrorType::NotFound, "task fd: '{}' was found but task was not found",
1218 timer_fd.as_raw_fd())
1219 );
1220 }
1221
1222 fn get_task_by_name(&mut self, task_name: &str) -> TimerResult<&mut PeriodicTaskTicket>
1223 {
1224 let res =
1225 self
1226 .tasks_name_to_fd
1227 .get(task_name)
1228 .ok_or_else(||
1229 map_timer_err!(TimerErrorType::NotFound, "task: '{}' was not found", task_name)
1230 )
1231 .map(|v|
1232 self
1233 .tasks_by_fd
1234 .get_mut(v)
1235 .ok_or_else(||
1236 map_timer_err!(TimerErrorType::NotFound, "task: '{}' fd: '{}' was found but task was not found",
1237 task_name, v)
1238 )
1239 )??;
1240
1241 return Ok(res);
1242 }
1243
1244 fn clone_thread_handler(&self, thread_last_id: usize) -> Arc<ThreadHandler>
1245 {
1246 let thread_local_hnd =
1247 self
1248 .thread_pool
1249 .get()
1250 .unwrap()[thread_last_id]
1251 .clone();
1252
1253 return thread_local_hnd;
1254 }
1255
1256 fn remove_task(&mut self, task_name: &str) -> TimerResult<PeriodicTaskTicket>
1257 {
1258 let Some(task_timer_fd) = self.tasks_name_to_fd.remove(task_name)
1259 else
1260 {
1261 timer_err!(TimerErrorType::NotFound, "task: '{}' was not found", task_name);
1262 };
1263
1264 let Some(ptt) = self.tasks_by_fd.remove(&task_timer_fd)
1265 else
1266 {
1267 timer_err!(TimerErrorType::NotFound, "task: '{}' fd: '{}' was found but task was not found",
1268 task_name, task_timer_fd);
1269 };
1270
1271 return Ok(ptt);
1272 }
1273
1274 fn contains_task(&self, task_name: &str) -> bool
1275 {
1276 return self.tasks_name_to_fd.contains_key(task_name);
1277 }
1278
1279 fn insert_task(&mut self, period_task_ticket: PeriodicTaskTicket)
1280 {
1281 let raw_fd = period_task_ticket.sync_timer.get_inner().as_raw_fd();
1282
1283 self.tasks_name_to_fd.insert(period_task_ticket.task_name.clone(), raw_fd);
1284 self.tasks_by_fd.insert(raw_fd, period_task_ticket);
1285
1286 return;
1287 }
1288}
1289
1290#[derive(Debug)]
1291pub struct SyncPeriodicTasksInner
1292{
1293 poll_int: PollInterrupt,
1295
1296 task_injector: Arc<Injector<GlobalTasks>>,
1298}
1299
1300impl SyncPeriodicTasksInner
1301{
1302 fn send_global_cmd(&self, glob: GlobalTasks) -> TimerResult<()>
1303 {
1304 let poll_int =
1305 self.poll_int.aquire()?;
1306
1307 self.task_injector.push(glob);
1308
1309 poll_int.interrupt_drop()?;
1310
1311 return Ok(());
1312 }
1313
1314 fn clear_global_queue(&self)
1315 {
1316 while let Steal::Success(_) = self.task_injector.steal() {}
1317
1318 return;
1319 }
1320}
1321
1322#[derive(Debug, Clone)]
1334pub struct SyncPeriodicTasks
1335{
1336 threads: Option<Arc<Vec<Arc<ThreadHandler>>>>,
1337
1338 inner: Arc<SyncPeriodicTasksInner>,
1339}
1340
1341impl Drop for SyncPeriodicTasks
1342{
1343 fn drop(&mut self)
1344 {
1345 self.inner.clear_global_queue();
1346
1347 let mut threads = self.threads.take().unwrap();
1348
1349 for thread in threads.iter()
1351 {
1352 thread.stop();
1353 thread.unpark();
1354 }
1355
1356 let _ = self.inner.poll_int.aquire().map(|v| v.interrupt_drop());
1358
1359 for _ in 0..5
1360 {
1361 let threads_unwr =
1362 match Arc::try_unwrap(threads)
1363 {
1364 Ok(r) => r,
1365 Err(e) =>
1366 {
1367 threads = e;
1368
1369 std::thread::sleep(Duration::from_millis(500));
1370
1371 continue;
1372
1373 }
1374 };
1375
1376 for thread in threads_unwr
1377 {
1378 thread.clean_local_queue();
1379
1380 let Some(thread) = Arc::into_inner(thread)
1381 else
1382 {
1383 panic!("assertion trap: ~SyncPeriodicTasks, a reference to ThreadHandler left somewhere");
1384 };
1385
1386 let _ = thread.hndl.join();
1387 }
1388
1389 break;
1390 }
1391 }
1392}
1393
1394
1395impl SyncPeriodicTasks
1396{
1397
1398
1399 pub
1412 fn new(threads_cnt: NonZeroUsize) -> TimerResult<Self>
1413 {
1414 let spti = SharedPeriodicTasks::new()?;
1415 let poll_int = spti.timers_poll.get_poll_interruptor();
1416
1417 let spti = Arc::new(Mutex::new(spti));
1419
1420 let task_injector = Arc::new(Injector::<GlobalTasks>::new());
1421
1422 let mut thread_hndls: Vec<Arc<ThreadHandler>> = Vec::with_capacity(threads_cnt.get());
1423
1424 for i in 0..threads_cnt.get()
1426 {
1427 let handler =
1428 ThreadWorker::new(format!("timer_exec/{}s", i), task_injector.clone(), spti.clone(), poll_int.clone())?;
1429
1430 thread_hndls.push(Arc::new(handler));
1431 }
1432
1433 let thread_hndls = Arc::new(thread_hndls);
1434
1435 let spti_lock = spti.lock().unwrap();
1437 spti_lock.thread_pool.get_or_init(|| thread_hndls.clone());
1438
1439 let thread =
1441 spti_lock
1442 .thread_pool
1443 .get()
1444 .unwrap()
1445 .get(random_range(0..threads_cnt.get()))
1446 .unwrap()
1447 .clone();
1448
1449 drop(spti_lock);
1450
1451 thread.unpark();
1453
1454 let inner =
1455 SyncPeriodicTasksInner
1456 {
1457 poll_int: poll_int,
1458 task_injector: task_injector,
1459 };
1460
1461 return Ok(
1462 Self
1463 {
1464 threads: Some(thread_hndls),
1465 inner: Arc::new(inner),
1466 }
1467 );
1468 }
1469
1470 pub
1484 fn add<T>(&self, task_name: impl Into<String>, task: T, task_time: PeriodicTaskTime) -> TimerResult<PeriodicTaskGuard>
1485 where T: PeriodicTask
1486 {
1487 let task_int: PeriodicTaskHndl = Box::new(task);
1488
1489 let task_name_str: String = task_name.into();
1490
1491 let period_task_guard =
1492 Arc::new(PeriodicTaskGuardInner::new(task_name_str.clone(), task_int)?);
1493
1494
1495 let (mpsc_send, mpsc_recv) = mpsc::channel();
1499
1500 self.inner.send_global_cmd(GlobalTasks::AddTask(task_name_str.clone(), task_time, period_task_guard.clone(), Some(mpsc_send)) )?;
1501
1502 let _ =
1503 mpsc_recv
1504 .recv()
1505 .map_err(|e|
1506 map_timer_err!(TimerErrorType::ExternalError, "mpsc error: {}", e)
1507 )??;
1508
1509 let ret =
1510 PeriodicTaskGuard
1511 {
1512 task_name: task_name_str,
1513 guard: Some(period_task_guard),
1514 spt: self.inner.clone()
1515 };
1516
1517 return Ok(ret);
1518 }
1519
1520 pub
1530 fn check_thread_status(&self) -> Option<String>
1531 {
1532 for thread in self.threads.as_ref().unwrap().iter()
1533 {
1534 if let None = thread.thread_flag.upgrade()
1535 {
1536 return Some(thread.hndl.thread().name().unwrap().to_string());
1537 }
1538 }
1539
1540 return None;
1541 }
1542}
1543
1544#[cfg(test)]
1545mod tests
1546{
1547 use std::{sync::mpsc::{self, RecvTimeoutError, Sender}, time::{Duration, Instant}};
1548
1549 use crate::{periodic_task::sync_tasks::{PeriodicTask, PeriodicTaskResult, PeriodicTaskTime, SyncPeriodicTasks}, AbsoluteTime, RelativeTime};
1550
1551 #[derive(Debug)]
1552 struct TaskStruct1
1553 {
1554 a1: u64,
1555 s: Sender<u64>,
1556 }
1557
1558 impl TaskStruct1
1559 {
1560 fn new(a1: u64, s: Sender<u64>) -> Self
1561 {
1562 return Self{ a1: a1, s };
1563 }
1564 }
1565
1566 impl PeriodicTask for TaskStruct1
1567 {
1568 fn exec(&mut self) -> PeriodicTaskResult
1569 {
1570 println!("taskstruct1 val: {}", self.a1);
1571
1572 let _ = self.s.send(self.a1);
1573
1574 return PeriodicTaskResult::Ok;
1575 }
1576 }
1577
1578 #[derive(Debug)]
1579 struct TaskStruct2
1580 {
1581 a1: u64,
1582 s: Sender<u64>,
1583 }
1584
1585 impl TaskStruct2
1586 {
1587 fn new(a1: u64, s: Sender<u64>) -> Self
1588 {
1589 return Self{ a1: a1, s };
1590 }
1591 }
1592
1593 impl PeriodicTask for TaskStruct2
1594 {
1595 fn exec(&mut self) -> PeriodicTaskResult
1596 {
1597 println!("taskstruct2 val: {}", self.a1);
1598
1599 self.s.send(self.a1).unwrap();
1600
1601 return PeriodicTaskResult::TaskReSchedule(PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(2, 0)));
1602 }
1603 }
1604
1605 #[test]
1606 fn test1_absolute_simple()
1607 {
1608 let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1609
1610 let (send, recv) = mpsc::channel::<u64>();
1611
1612 let task1 = TaskStruct1::new(2, send);
1613 let task1_ptt = PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1614 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1615
1616 println!("added");
1617 let val = recv.recv();
1618
1619 println!("{:?}", val);
1620
1621 drop(task1_guard);
1622 }
1623
1624 #[test]
1625 fn test1_relative_simple()
1626 {
1627 let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1628
1629 let (send, recv) = mpsc::channel::<u64>();
1630
1631 let task1 = TaskStruct1::new(2, send);
1632 let task1_ptt = PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1633 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1634
1635 let mut s = Instant::now();
1636
1637 for i in 0..3
1638 {
1639 let val = recv.recv().unwrap();
1640
1641 let e = s.elapsed();
1642 s = Instant::now();
1643
1644 println!("{}: {:?} {:?} {}", i, val, e, e.as_micros());
1645
1646 assert!(999000 < e.as_micros() && e.as_micros() < 10001200);
1647 assert_eq!(val, 2);
1648 }
1649
1650 drop(task1_guard);
1651
1652 std::thread::sleep(Duration::from_millis(100));
1653
1654 return;
1655 }
1656
1657 #[test]
1658 fn test1_relative_resched_to_abs()
1659 {
1660 let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1661
1662 let (send, recv) = mpsc::channel::<u64>();
1663
1664 let task1 = TaskStruct2::new(2, send);
1665 let task1_ptt = PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1666 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1667
1668 let s = Instant::now();
1669 match recv.recv_timeout(Duration::from_millis(1150))
1670 {
1671 Ok(rcv_a) =>
1672 {
1673 let e = s.elapsed();
1674 println!("{:?} {}", e, e.as_micros());
1675 assert_eq!(rcv_a, 2);
1676 assert!(999051 < e.as_micros() && e.as_micros() < 1000551);
1677 },
1678 Err(RecvTimeoutError::Timeout) =>
1679 panic!("tineout"),
1680 Err(e) =>
1681 panic!("{}", e),
1682 }
1683
1684 let s = Instant::now();
1685 match recv.recv_timeout(Duration::from_millis(2100))
1686 {
1687 Ok(rcv_a) =>
1688 {
1689 let e = s.elapsed();
1690 println!("{:?} {}", e, e.as_micros());
1691 assert_eq!(rcv_a, 2);
1692 assert!(1999642 < e.as_micros() && e.as_micros() < 2000342);
1693 },
1694 Err(RecvTimeoutError::Timeout) =>
1695 panic!("tineout"),
1696 Err(e) =>
1697 panic!("{}", e),
1698 }
1699
1700 let s = Instant::now();
1701 match recv.recv_timeout(Duration::from_millis(2100))
1702 {
1703 Ok(rcv_a) =>
1704 {
1705 let e = s.elapsed();
1706 println!("{:?} {}", e, e.as_micros());
1707 assert_eq!(rcv_a, 2);
1708 assert!(1999642 < e.as_micros() && e.as_micros() < 2000342);
1709 },
1710 Err(RecvTimeoutError::Timeout) =>
1711 panic!("tineout"),
1712 Err(e) =>
1713 panic!("{}", e),
1714 }
1715
1716 drop(task1_guard);
1717
1718 std::thread::sleep(Duration::from_millis(100));
1719
1720 return;
1721 }
1722
1723 #[test]
1724 fn test1_relative_simple_resched()
1725 {
1726 let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1727
1728 let (send, recv) = mpsc::channel::<u64>();
1729
1730 let task1 = TaskStruct1::new(2, send);
1731 let task1_ptt =
1732 PeriodicTaskTime::interval(
1733 RelativeTime::new_time(1, 0)
1734 );
1735 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1736
1737 let mut s = Instant::now();
1738
1739 for i in 0..3
1740 {
1741 let val = recv.recv().unwrap();
1742
1743 let e = s.elapsed();
1744 s = Instant::now();
1745
1746 println!("{}: {:?} {:?} {}", i, val, e, e.as_micros());
1747
1748 assert!(999000 < e.as_micros() && e.as_micros() < 10001200);
1749 assert_eq!(val, 2);
1750 }
1751
1752 task1_guard
1753 .reschedule_task(
1754 PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(2, 0))
1755 )
1756 .unwrap();
1757
1758 s = Instant::now();
1759 let val = recv.recv().unwrap();
1760 let e = s.elapsed();
1761
1762 println!("resched: {:?} {:?} {}", val, e, e.as_micros());
1763
1764 assert!(1999000 < e.as_micros() && e.as_micros() < 2000560);
1765
1766 let val = recv.recv_timeout(Duration::from_secs(3));
1767
1768 assert_eq!(val.is_err(), true);
1769 assert_eq!(val.err().unwrap(), RecvTimeoutError::Timeout);
1770
1771 drop(task1_guard);
1772
1773 std::thread::sleep(Duration::from_millis(100));
1774
1775 return;
1776 }
1777
1778 #[test]
1779 fn test1_relative_simple_cancel()
1780 {
1781 let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1782
1783 let (send, recv) = mpsc::channel::<u64>();
1784
1785 let task1 = TaskStruct1::new(0, send.clone());
1786 let task1_ptt =
1787 PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1788
1789 let task2 = TaskStruct1::new(1, send.clone());
1790 let task2_ptt =
1791 PeriodicTaskTime::interval(RelativeTime::new_time(2, 0));
1792
1793 let task3 = TaskStruct1::new(2, send);
1794 let task3_ptt =
1795 PeriodicTaskTime::interval(RelativeTime::new_time(0, 500_000_000));
1796
1797 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1798 let task2_guard = s.add("task2", task2, task2_ptt).unwrap();
1799 let task3_guard = s.add("task3", task3, task3_ptt).unwrap();
1800
1801 let mut a_cnt: [u8; 3] = [0_u8; 3];
1802
1803 let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
1804
1805 while AbsoluteTime::now() < end
1806 {
1807 match recv.recv_timeout(Duration::from_millis(1))
1808 {
1809 Ok(rcv_a) =>
1810 a_cnt[rcv_a as usize] += 1,
1811 Err(RecvTimeoutError::Timeout) =>
1812 continue,
1813 Err(e) =>
1814 panic!("{}", e),
1815 }
1816
1817
1818 }
1819
1820 assert_eq!(a_cnt[0], 5);
1821 assert_eq!(a_cnt[1], 2);
1822 assert_eq!(a_cnt[2], 10);
1823
1824 task3_guard.suspend_task().unwrap();
1826
1827
1828 let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
1829
1830 while AbsoluteTime::now() < end
1831 {
1832 match recv.recv_timeout(Duration::from_millis(1))
1833 {
1834 Ok(rcv_a) =>
1835 a_cnt[rcv_a as usize] += 1,
1836 Err(RecvTimeoutError::Timeout) =>
1837 continue,
1838 Err(e) =>
1839 panic!("{}", e),
1840 }
1841
1842
1843 }
1844
1845 assert_eq!(a_cnt[0] > 5, true);
1846 assert_eq!(a_cnt[1] > 2, true);
1847 assert!((a_cnt[2] == 10 || a_cnt[2] == 11));
1848
1849 drop(task1_guard);
1850 drop(task2_guard);
1851 drop(task3_guard);
1852
1853 let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
1854
1855 while AbsoluteTime::now() < end
1856 {
1857 match recv.recv_timeout(Duration::from_millis(1))
1858 {
1859 Ok(rcv_a) =>
1860 a_cnt[rcv_a as usize] += 1,
1861 Err(RecvTimeoutError::Timeout) =>
1862 continue,
1863 Err(_) =>
1864 break,
1865 }
1866
1867
1868 }
1869
1870 assert_eq!(AbsoluteTime::now() < end, true);
1871
1872
1873
1874 return;
1875 }
1876
1877 #[test]
1879 fn test2_multithread_1()
1880 {
1881 let s = SyncPeriodicTasks::new(2.try_into().unwrap()).unwrap();
1882
1883 let (send, recv) = mpsc::channel::<u64>();
1884
1885 let task1 = TaskStruct1::new(0, send.clone());
1886 let task1_ptt =
1887 PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1888
1889 let task2 = TaskStruct1::new(1, send.clone());
1890 let task2_ptt =
1891 PeriodicTaskTime::interval(RelativeTime::new_time(2, 0));
1892
1893 let task3 = TaskStruct1::new(2, send.clone());
1894 let task3_ptt =
1895 PeriodicTaskTime::interval(RelativeTime::new_time(0, 500_000_000));
1896
1897 let task4 = TaskStruct1::new(3, send.clone());
1898 let task4_ptt =
1899 PeriodicTaskTime::interval(RelativeTime::new_time(0, 200_000_000));
1900
1901 let task5 = TaskStruct1::new(4, send.clone());
1902 let task5_ptt =
1903 PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(5, 0));
1904
1905 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1906 let task2_guard = s.add("task2", task2, task2_ptt).unwrap();
1907 let task3_guard = s.add("task3", task3, task3_ptt).unwrap();
1908 let task4_guard = s.add("task4", task4, task4_ptt).unwrap();
1909 let task5_guard = s.add("task5", task5, task5_ptt).unwrap();
1910
1911
1912 let mut a_cnt: [u8; 5] = [0_u8; 5];
1913
1914 let end = AbsoluteTime::now() + RelativeTime::new_time(5, 500_000_000);
1915
1916 while AbsoluteTime::now() < end
1917 {
1918 match recv.recv_timeout(Duration::from_millis(1))
1919 {
1920 Ok(rcv_a) =>
1921 a_cnt[rcv_a as usize] += 1,
1922 Err(RecvTimeoutError::Timeout) =>
1923 continue,
1924 Err(e) =>
1925 panic!("{}", e),
1926 }
1927
1928
1929 }
1930
1931 println!("{:?}", a_cnt);
1932
1933 assert!(a_cnt[0] == 5);
1934 assert!(a_cnt[1] == 2);
1935 assert!((a_cnt[2] == 10 || a_cnt[2] == 11));
1936 assert!(a_cnt[3] == 27);
1937 assert!(a_cnt[4] == 1);
1938
1939 task5_guard.reschedule_task(PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(0, 500_000_000))).unwrap();
1940
1941 let end = AbsoluteTime::now() + RelativeTime::new_time(0, 600_000_000);
1942
1943 while AbsoluteTime::now() < end
1944 {
1945 match recv.recv_timeout(Duration::from_millis(1))
1946 {
1947 Ok(rcv_a) =>
1948 a_cnt[rcv_a as usize] += 1,
1949 Err(RecvTimeoutError::Timeout) =>
1950 continue,
1951 Err(e) =>
1952 panic!("{}", e),
1953 }
1954 }
1955
1956 println!("{:?}", a_cnt);
1957 assert!(a_cnt[4] == 2);
1958
1959 drop(task5_guard);
1960 drop(task4_guard);
1961 drop(task3_guard);
1962 drop(task2_guard);
1963
1964 let end = AbsoluteTime::now() + RelativeTime::new_time(2, 1000);
1965
1966 while AbsoluteTime::now() < end
1967 {
1968 match recv.recv_timeout(Duration::from_millis(1))
1969 {
1970 Ok(rcv_a) =>
1971 a_cnt[rcv_a as usize] += 1,
1972 Err(RecvTimeoutError::Timeout) =>
1973 continue,
1974 Err(e) =>
1975 panic!("{}", e),
1976 }
1977 }
1978
1979
1980 println!("{:?}", a_cnt);
1981 assert_eq!(a_cnt[4], 2);
1982 assert_eq!(a_cnt[0], 8);
1983
1984 drop(task1_guard);
1985
1986 std::thread::sleep(Duration::from_millis(10));
1987 return;
1988 }
1989}