1use std::
17{
18 cell::{OnceCell, RefCell}, collections::HashMap, fmt, mem, num::NonZeroUsize, os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd}, sync::
19 {
20 atomic::{AtomicBool, Ordering},
21 mpsc::{self},
22 Arc,
23 Mutex,
24 TryLockError,
25 Weak
26 }, thread::JoinHandle, time::Duration
27};
28
29use crossbeam_deque::{Injector, Steal};
30use rand::random_range;
31
32use crate::
33{
34 error::{TimerError, TimerErrorType, TimerResult},
35 map_timer_err,
36 timer_portable::{poll::PollInterrupt, timer::TimerFd, PollEventType, TimerExpMode, TimerFlags, TimerType},
37 AbsoluteTime,
38 FdTimerCom,
39 RelativeTime,
40 TimerPoll,
41 TimerReadRes
42};
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum PeriodicTaskResult
50{
51 Ok,
53
54 CancelTask,
60
61 TaskReSchedule(PeriodicTaskTime)
64}
65
66pub trait PeriodicTask: Send + fmt::Debug + 'static
76{
77 fn exec(&mut self) -> PeriodicTaskResult;
79}
80
81pub type PeriodicTaskHndl = Box<dyn PeriodicTask>;
83
84
85#[derive(Debug)]
90pub(crate) enum GlobalTasks
91{
92 AddTask( PeriodicTaskTicket, Option<mpsc::Sender<TimerResult<()>>> ),
95
96 RemoveTask( Arc<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
98
99 ReschedTask( Weak<PeriodicTaskGuardInner>, PeriodicTaskTime, Option<mpsc::Sender<TimerResult<()>>> ),
101
102 SuspendTask( Weak<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
105
106 ResumeTask( Weak<PeriodicTaskGuardInner>, Option<mpsc::Sender<TimerResult<()>>> ),
108}
109
110#[derive(Debug)]
112pub enum ThreadTask
113{
114 TaskExec( Arc<NewTaskTicket> )
116}
117
118#[derive(Debug)]
122pub struct NewTaskTicket
123{
124 task_thread: Arc<ThreadHandler>,
126 ptgi: Weak<PeriodicTaskGuardInner>,
127}
128
129impl NewTaskTicket
130{
131 fn new(task_thread: Arc<ThreadHandler>, ptgi: Weak<PeriodicTaskGuardInner>) -> Self
132 {
133 return
134 Self
135 {
136 task_thread:
137 task_thread,
138 ptgi:
139 ptgi
140 };
141 }
142
143 fn send_task(this: Arc<NewTaskTicket>, task_rep_count: u64, thread_hndl_cnt: usize)
145 {
146 let strong_cnt = Arc::strong_count(&this);
147 let thread = this.task_thread.clone();
148
149 for _ in 0..task_rep_count
150 {
151 thread.send_task(this.clone(), strong_cnt < 2 && thread_hndl_cnt > 1);
152 }
153 }
154}
155
156#[derive(Debug)]
159pub(crate) struct PeriodicTaskTicket
160{
161 task_name: String,
163
164 ptt: PeriodicTaskTime,
166
167 weak_ticket: Weak<NewTaskTicket>,
171
172 ptg: Weak<PeriodicTaskGuardInner>,
175}
176
177impl PeriodicTaskTicket
178{
179 fn new(task_name: String, ptt: PeriodicTaskTime, ptg: Weak<PeriodicTaskGuardInner>) -> Self
180 {
181 return
182 Self
183 {
184 task_name: task_name,
185 ptt: ptt,
186 weak_ticket: Weak::new(),
187 ptg: ptg,
188 };
189 }
190
191
192 fn get_task_guard(&self) -> TimerResult<Arc<PeriodicTaskGuardInner>>
193 {
194 return
195 self
196 .ptg
197 .upgrade()
198 .ok_or_else(||
199 map_timer_err!(TimerErrorType::ReferenceGone, "task: '{}' reference to timer has gone",
200 self.task_name)
201 );
202 }
203
204 fn get_timer_time(&self) -> &PeriodicTaskTime
205 {
206 return &self.ptt;
207 }
208}
209
210#[derive(Debug)]
214pub struct PeriodicTaskGuard
215{
216 task_name: String,
218
219 guard: Option<Arc<PeriodicTaskGuardInner>>,
227
228 spt: Arc<SyncPeriodicTasksInner>
231}
232
233impl Drop for PeriodicTaskGuard
234{
235 fn drop(&mut self)
236 {
237 let guard = self.guard.take().unwrap();
238
239 let _ = self.spt.send_global_cmd(GlobalTasks::RemoveTask(guard, None));
240
241 return;
242 }
243}
244
245impl PeriodicTaskGuard
246{
247 pub
275 fn reschedule_task(&self, ptt: PeriodicTaskTime) -> TimerResult<()>
276 {
277 let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
278
279 let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
280
281 self.spt.send_global_cmd(GlobalTasks::ReschedTask(weak_ptgi, ptt, Some(snd)))?;
282
283 return
284 rcv
285 .recv_timeout(Duration::from_secs(10))
286 .map_err(|e|
287 map_timer_err!(TimerErrorType::MpscTimeout, "reschedule_task(), task name: '{}', timer '{}' MPSC rcv timeout error: '{}'",
288 self.task_name, self.guard.as_ref().unwrap().timer_fd, e)
289 )?;
290 }
291
292 pub
312 fn suspend_task(&self) -> TimerResult<()>
313 {
314 let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
315
316 let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
317
318 self.spt.send_global_cmd(GlobalTasks::SuspendTask(weak_ptgi, Some(snd)))?;
319
320 return
321 rcv
322 .recv_timeout(Duration::from_secs(10))
323 .map_err(|e|
324 map_timer_err!(TimerErrorType::MpscTimeout, "suspend_task(), task name: '{}', timer '{}' MPSC rcv timeout error: '{}'",
325 self.task_name, self.guard.as_ref().unwrap().timer_fd, e)
326 )?;
327 }
328
329 pub
349 fn resume_task(&self) -> TimerResult<()>
350 {
351 let weak_ptgi = Arc::downgrade(self.guard.as_ref().unwrap());
352
353 let (snd, rcv) = mpsc::channel::<Result<(), TimerError>>();
354
355 self.spt.send_global_cmd(GlobalTasks::ResumeTask(weak_ptgi, Some(snd)))?;
356
357 return
358 rcv
359 .recv_timeout(Duration::from_secs(10))
360 .map_err(|e|
361 map_timer_err!(TimerErrorType::MpscTimeout, "resume_task(), task name: '{}', timer '{}' MPSC rcv timeout error: '{}'",
362 self.task_name, self.guard.as_ref().unwrap().timer_fd, e)
363 )?;
364 }
365}
366
367#[derive(Debug, Clone, Copy, PartialEq, Eq)]
371pub enum PeriodicTaskTime
372{
373 Absolute(TimerExpMode<AbsoluteTime>),
375
376 Relative(TimerExpMode<RelativeTime>),
378}
379
380impl From<TimerExpMode<AbsoluteTime>> for PeriodicTaskTime
381{
382 fn from(value: TimerExpMode<AbsoluteTime>) -> Self
383 {
384 return Self::Absolute(value);
385 }
386}
387
388impl From<TimerExpMode<RelativeTime>> for PeriodicTaskTime
389{
390 fn from(value: TimerExpMode<RelativeTime>) -> Self
391 {
392 return Self::Relative(value);
393 }
394}
395
396impl fmt::Display for PeriodicTaskTime
397{
398 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
399 {
400 match self
401 {
402 Self::Absolute(t) =>
403 write!(f, "{}", t),
404 Self::Relative(t) =>
405 write!(f, "{}", t),
406 }
407 }
408}
409
410impl PeriodicTaskTime
411{
412 #[inline]
415 pub
416 fn exact_time(abs_time: AbsoluteTime) -> Self
417 {
418 return Self::Absolute(TimerExpMode::<AbsoluteTime>::new_oneshot(abs_time));
419 }
420
421 #[inline]
423 pub
424 fn interval(rel_time: RelativeTime) -> Self
425 {
426 return Self::Relative(TimerExpMode::<RelativeTime>::new_interval(rel_time));
427 }
428
429 #[inline]
438 pub
439 fn interval_with_start_delay(start_del_time: RelativeTime, rel_int_time: RelativeTime) -> Self
440 {
441 return Self::Relative(TimerExpMode::<RelativeTime>::new_interval_with_init_delay(start_del_time, rel_int_time));
442 }
443
444 fn set_timer(&self, timer_fd: &TimerFd) -> TimerResult<()>
446 {
447 match *self
448 {
449 Self::Absolute(timer_exp_mode) =>
450 return
451 timer_fd
452 .set_time(timer_exp_mode)
453 .map_err(|e|
454 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot set time '{}' for timer: '{}'", timer_exp_mode, timer_fd )
455 ),
456 Self::Relative(timer_exp_mode) =>
457 return
458 timer_fd
459 .set_time(timer_exp_mode)
460 .map_err(|e|
461 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot set time '{}' for timer: '{}'", timer_exp_mode, timer_fd )
462 ),
463 }
464 }
465}
466
467
468#[derive(Debug)]
470pub(crate) struct PeriodicTaskGuardInner
471{
472 timer_fd: TimerFd,
474
475 task: Mutex<PeriodicTaskHndl>,
481}
482
483impl fmt::Display for PeriodicTaskGuardInner
484{
485 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
486 {
487 write!(f, "timer: '{}'", self.timer_fd)
488 }
489}
490
491impl AsFd for PeriodicTaskGuardInner
492{
493 fn as_fd(&self) -> BorrowedFd<'_>
494 {
495 return self.timer_fd.as_fd();
496 }
497}
498
499impl PeriodicTaskGuardInner
500{
501 fn new(timer_name: String, task_inst: PeriodicTaskHndl) -> TimerResult<Self>
502 {
503 let timer =
504 TimerFd::new(timer_name.clone().into(), TimerType::CLOCK_REALTIME,
505 TimerFlags::TFD_CLOEXEC | TimerFlags::TFD_NONBLOCK)
506 .map_err(|e|
507 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "cannot setup timer for task: '{}'", timer_name)
508 )?;
509
510 return Ok(Self { timer_fd: timer, task: Mutex::new(task_inst) });
511 }
512
513 #[inline]
514 fn setup_timer(&self, task_time_set: &PeriodicTaskTime) -> TimerResult<()>
515 {
516 return task_time_set.set_timer(&self.timer_fd);
517 }
518
519 #[inline]
520 fn unset_timer(&self) -> TimerResult<()>
521 {
522 return
523 self
524 .timer_fd
525 .unset_time()
526 .map_err(|e|
527 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "unsetting timer '{}' returned error: {}", self.timer_fd, e)
528 );
529 }
530}
531
532struct ThreadWorker
534{
535 thread_name: String,
537
538 global_task_injector: Arc<Injector<GlobalTasks>>,
540
541 local_thread_inj: Arc<Injector<ThreadTask>>,
543
544 thread_run_flag: Arc<AtomicBool>,
547
548 spti: Arc<Mutex<SharedPeriodicTasks>>,
553
554 thread_last_id: usize,
556
557 poll_int: PollInterrupt
559}
560
561impl ThreadWorker
562{
563 fn new(thread_name: String, global_task_injector: Arc<Injector<GlobalTasks>>,
564 spti: Arc<Mutex<SharedPeriodicTasks>>, poll_int: PollInterrupt) -> TimerResult<ThreadHandler>
565 {
566 let local_thread_inj = Arc::new(Injector::<ThreadTask>::new());
567 let thread_run_flag = Arc::new(AtomicBool::new(true));
568 let thread_run_flag_weak = Arc::downgrade(&thread_run_flag);
569
570 let worker =
571 ThreadWorker
572 {
573 thread_name:
574 thread_name.clone(),
575 global_task_injector:
576 global_task_injector,
577 local_thread_inj:
578 local_thread_inj.clone(),
579 thread_run_flag:
580 thread_run_flag,
581 spti:
582 spti,
583 thread_last_id:
584 0,
585 poll_int:
586 poll_int
587 };
588
589 let thread_hndl =
590 std::thread::Builder::new()
591 .name(thread_name)
592 .spawn(|| worker.worker())
593 .map_err(|e|
594 map_timer_err!(TimerErrorType::SpawnError(e.kind()), "{}", e)
595 )?;
596
597
598 return Ok( ThreadHandler::new(thread_hndl, local_thread_inj, thread_run_flag_weak) );
599 }
600
601
602 fn worker(mut self) -> TimerResult<()>
603 {
604 std::thread::park();
606
607 while self.thread_run_flag.load(Ordering::Acquire) == true
608 {
609 while let Steal::Success(task) = self.local_thread_inj.steal()
611 {
612 match task
613 {
614 ThreadTask::TaskExec(task_exec) =>
615 {
616 let Some(ptgi) = task_exec.ptgi.upgrade()
617 else
618 {
619 continue;
621 };
622
623 match ptgi.task.lock().unwrap().exec()
625 {
626 PeriodicTaskResult::Ok =>
627 {},
628 PeriodicTaskResult::CancelTask =>
629 {
630 self
631 .global_task_injector
632 .push(GlobalTasks::SuspendTask(task_exec.ptgi.clone(), None));
633
634 let _ = self.poll_int.aquire().map(|v| v.interrupt_drop());
635 },
636 PeriodicTaskResult::TaskReSchedule(ptt) =>
637 {
638 self
639 .global_task_injector
640 .push(GlobalTasks::ReschedTask(task_exec.ptgi.clone(), ptt, None));
641
642 let _ = self.poll_int.aquire().map(|v| v.interrupt_drop());
643 }
644 }
645
646 drop(task_exec);
647 }
648 }
649 }
650
651 let spti_lock_res = self.spti.try_lock();
652
653 if let Ok(mut task_token) = spti_lock_res
654 {
655 let thread_hndl_cnt = task_token.thread_pool.get().unwrap().len();
656
657 while let Steal::Success(task) = self.global_task_injector.steal()
659 {
660 match task
661 {
662 GlobalTasks::AddTask(ptt, opt_err_ret) =>
663 {
664
665 let timer =
666 match ptt.get_task_guard()
667 {
668 Ok(r) =>
669 {
670 r
671 },
672 Err(e) =>
673 {
674 if let Some(err_ret) = opt_err_ret
675 {
676 let _ = err_ret.send(Err(e));
677 }
678
679 continue;
680 }
681 };
682
683 if let Err(e) = timer.setup_timer(ptt.get_timer_time())
685 {
686 if let Some(err_ret) = opt_err_ret
687 {
688 let _ = err_ret.send(Err(e));
689 }
690
691 continue;
692 }
693
694 if let Err(e) = task_token.timers_poll.add(&timer.timer_fd)
696 {
697 if let Some(err_ret) = opt_err_ret
698 {
699 let _ = err_ret.send(Err(e));
700 }
701
702 continue;
703 }
704
705
706 let timer_fd = timer.as_fd().as_raw_fd();
707
708 task_token.tasks.insert(timer_fd, RefCell::new(ptt));
709
710 if let Some(err_ret) = opt_err_ret
711 {
712 let _ = err_ret.send(Ok(()));
713 }
714 },
715 GlobalTasks::RemoveTask(ptg_arc, opt_err_ret) =>
716 {
717 let _ = ptg_arc.timer_fd.unset_time();
718
719 if let Some(_v) = task_token.tasks.remove(&ptg_arc.as_fd().as_raw_fd())
720 {
721 let res = task_token.timers_poll.delete(&ptg_arc.timer_fd);
722
723 if let Some(err_ret) = opt_err_ret.as_ref()
724 {
725 let _ = err_ret.send(res);
726 }
727 }
728 else
729 {
730 if let Some(err_ret) = opt_err_ret
731 {
732 let _ = err_ret.send(Ok(()));
733 }
734 }
735
736 drop(ptg_arc);
737 },
738 GlobalTasks::ReschedTask( ptg_weak, ptt, opt_err_ret ) =>
739 {
740 let Some(ptg_arc) = ptg_weak.upgrade()
741 else
742 {
743 continue;
745 };
746
747 if let Err(e) = ptg_arc.setup_timer(&ptt)
748 {
749 if let Some(err_ret) = opt_err_ret.as_ref()
750 {
751 let _ = err_ret.send(Err(e));
752 }
753
754 continue;
755 }
756
757 let res_task =
759 task_token
760 .tasks
761 .get(&ptg_arc.timer_fd.as_raw_fd())
762 .ok_or_else(||
763 map_timer_err!(TimerErrorType::NotFound,
764 "thread: '{}', timer with FD: '{}' was not found", self.thread_name, ptg_arc.timer_fd)
765 );
766
767 let res =
768 match res_task
769 {
770 Ok(task) =>
771 {
772 let _ = mem::replace(&mut task.borrow_mut().ptt, ptt);
773
774 Ok(())
775 },
776 Err(err) =>
777 {
778 Err(err)
779 }
780 };
781
782 if let Some(err_ret) = opt_err_ret
783 {
784 let _ = err_ret.send(res);
785 }
786 },
787 GlobalTasks::SuspendTask(ptg_weak, opt_err_ret) =>
788 {
789 let Some(ptg_arc) = ptg_weak.upgrade()
790 else
791 {
792 continue;
794 };
795
796 let res = ptg_arc.unset_timer();
797
798 if let Some(err_ret) = opt_err_ret
799 {
800 let _ = err_ret.send(res);
801 }
802 },
803 GlobalTasks::ResumeTask(ptg_weak, opt_err_ret) =>
804 {
805 let Some(ptg_arc) = ptg_weak.upgrade()
806 else
807 {
808 continue;
810 };
811
812 let res_task =
813 task_token
814 .tasks
815 .get(&ptg_arc.timer_fd.as_raw_fd())
816 .ok_or_else(||
817 map_timer_err!(TimerErrorType::NotFound,
818 "thread: '{}', timer with FD: '{}' was not found", self.thread_name, ptg_arc.timer_fd)
819 );
820
821 if let Err(e) = res_task
822 {
823 if let Some(err_ret) = opt_err_ret.as_ref()
824 {
825 let _ = err_ret.send(Err(e));
826 }
827
828 continue;
829 }
830
831 let res =
832 ptg_arc.setup_timer(res_task.unwrap().borrow().get_timer_time());
833
834 if let Some(err_ret) = opt_err_ret.as_ref()
835 {
836 let _ = err_ret.send(res);
837 }
838 }
839 }
840 }
841
842 let res = task_token.timers_poll.poll(Some(5000))?;
844
845
846 for event in res.into_inner()
847 {
848 match event
849 {
850 PollEventType::Some(timer_fd) =>
851 {
852 let task =
854 task_token
855 .tasks
856 .get(&timer_fd)
857 .ok_or_else(||
858 map_timer_err!(TimerErrorType::NotFound,
859 "thread: '{}', timer with FD: '{}' was not found", self.thread_name, timer_fd)
860 )?;
861
862 let Some(ptg_arc) = task.borrow().ptg.upgrade()
864 else
865 {
866 task_token.tasks.remove(&timer_fd);
868
869 continue;
871 };
872
873 let timer_rd_res=
874 ptg_arc
875 .timer_fd
876 .read()
877 .map_err(|e|
878 map_timer_err!(TimerErrorType::TimerError(e.get_errno()),
879 "thread: '{}', timer with FD: '{}' err: {}", self.thread_name, timer_fd, e)
880 )?;
881
882 let overflow_cnt: u64 =
884 match timer_rd_res
885 {
886 TimerReadRes::Ok(overfl) =>
887 {
888 overfl
889 },
890 TimerReadRes::Cancelled =>
891 {
892 self
895 .global_task_injector
896 .push(
897 GlobalTasks::ReschedTask(task.borrow().ptg.clone(), task.borrow().ptt.clone(), None)
898 );
899
900 continue;
902 },
903 TimerReadRes::WouldBlock =>
904 {
905 panic!("assertion trap: timer retuned WouldBlock, {}", ptg_arc);
907 }
908 };
909
910 let ticket_arc_opt = task.borrow().weak_ticket.upgrade();
912
913
914 let ticket =
915 match ticket_arc_opt
916 {
917 Some(ticket) =>
918 ticket,
919 None =>
920 {
921 let task_thread =
922 {
923 self.thread_last_id = (self.thread_last_id + 1) % thread_hndl_cnt;
933
934 task_token.clone_thread_handler(self.thread_last_id)
936 };
937
938 let ticket =
939 {
940 Arc::new(
942 NewTaskTicket::new(task_thread, task.borrow().ptg.clone())
943 )
944 };
945
946 task.borrow_mut().weak_ticket = Arc::downgrade(&ticket);
947
948 ticket
949 }
950 };
951
952
953
954 NewTaskTicket::send_task(ticket, overflow_cnt, thread_hndl_cnt);
955 },
956 PollEventType::TimerRemoved(timer_fd) =>
957 {
958 task_token.tasks.remove(&timer_fd);
960 },
961 PollEventType::SubError(_timer_error) =>
962 {
963 },
965 }
966 } }
968 else if let Err(TryLockError::WouldBlock) = spti_lock_res
969 {
970 if self.thread_run_flag.load(Ordering::Acquire) == false
971 {
972 return Ok(());
973 }
974
975 if self.local_thread_inj.is_empty() == true
976 {
977 std::thread::park_timeout(Duration::from_secs(2));
978 }
979 }
980
981 } return Ok(());
984 }
985}
986
987
988#[derive(Debug)]
991struct ThreadHandler
992{
993 hndl: JoinHandle<TimerResult<()>>,
995
996 task_injector: Arc<Injector<ThreadTask>>,
998
999 thread_flag: Weak<AtomicBool>,
1001}
1002
1003impl ThreadHandler
1004{
1005 fn new(hndl: JoinHandle<TimerResult<()>>, task_injector: Arc<Injector<ThreadTask>>, thread_flag: Weak<AtomicBool>) -> Self
1006 {
1007 return
1008 Self
1009 {
1010 hndl,
1011 task_injector,
1012 thread_flag: thread_flag
1013 };
1014 }
1015
1016 fn stop(&self)
1017 {
1018 if let Some(v) = self.thread_flag.upgrade()
1019 {
1020 v.store(false, Ordering::Release);
1021 }
1022 }
1023
1024 fn unpark(&self)
1025 {
1026 self.hndl.thread().unpark();
1027 }
1028
1029 fn send_task(&self, task: Arc<NewTaskTicket>, unpark: bool)
1030 {
1031 self.task_injector.push(ThreadTask::TaskExec(task));
1032
1033 if unpark == true
1034 {
1035 self.hndl.thread().unpark();
1036 }
1037
1038 return;
1039 }
1040
1041 fn clean_local_queue(&self)
1042 {
1043 while let Steal::Success(_) = self.task_injector.steal() {}
1044
1045 return;
1046 }
1047}
1048
1049#[derive(Debug)]
1052pub struct SharedPeriodicTasks
1053{
1054 thread_pool: OnceCell<Arc<Vec<Arc<ThreadHandler>>>>,
1056
1057 tasks: HashMap<RawFd, RefCell<PeriodicTaskTicket>>,
1059
1060 timers_poll: TimerPoll,
1062}
1063
1064
1065impl SharedPeriodicTasks
1066{
1067 fn new() -> TimerResult<Self>
1068 {
1069
1070 return Ok(
1071 Self
1072 {
1073 thread_pool: OnceCell::default(),
1074 tasks: HashMap::new(),
1075 timers_poll: TimerPoll::new()?
1076 }
1077 );
1078 }
1079
1080 fn clone_thread_handler(&self, thread_last_id: usize) -> Arc<ThreadHandler>
1081 {
1082 let thread_local_hnd =
1083 self
1084 .thread_pool
1085 .get()
1086 .unwrap()[thread_last_id]
1087 .clone();
1088
1089 return thread_local_hnd;
1090 }
1091}
1092
1093#[derive(Debug)]
1094pub struct SyncPeriodicTasksInner
1095{
1096 poll_int: PollInterrupt,
1098
1099 task_injector: Arc<Injector<GlobalTasks>>,
1101}
1102
1103impl SyncPeriodicTasksInner
1104{
1105 fn send_global_cmd(&self, glob: GlobalTasks) -> TimerResult<()>
1106 {
1107 let poll_int =
1108 self.poll_int.aquire()?;
1109
1110 self.task_injector.push(glob);
1111
1112 poll_int.interrupt_drop()?;
1113
1114 return Ok(());
1115 }
1116
1117 fn clear_global_queue(&self)
1118 {
1119 while let Steal::Success(_) = self.task_injector.steal() {}
1120
1121 return;
1122 }
1123}
1124
1125#[derive(Debug, Clone)]
1137pub struct SyncPeriodicTasks
1138{
1139 threads: Option<Arc<Vec<Arc<ThreadHandler>>>>,
1140
1141 inner: Arc<SyncPeriodicTasksInner>,
1142}
1143
1144impl Drop for SyncPeriodicTasks
1145{
1146 fn drop(&mut self)
1147 {
1148 self.inner.clear_global_queue();
1149
1150 let mut threads = self.threads.take().unwrap();
1151
1152 for thread in threads.iter()
1154 {
1155 thread.stop();
1156 thread.unpark();
1157 }
1158
1159 let _ = self.inner.poll_int.aquire().map(|v| v.interrupt_drop());
1161
1162 for _ in 0..5
1163 {
1164 let threads_unwr =
1165 match Arc::try_unwrap(threads)
1166 {
1167 Ok(r) => r,
1168 Err(e) =>
1169 {
1170 threads = e;
1171
1172 std::thread::sleep(Duration::from_millis(500));
1173
1174 continue;
1175
1176 }
1177 };
1178
1179 for thread in threads_unwr
1180 {
1181 thread.clean_local_queue();
1182
1183 let Some(thread) = Arc::into_inner(thread)
1184 else
1185 {
1186 panic!("assertion trap: ~SyncPeriodicTasks, a reference to ThreadHandler left somewhere");
1187 };
1188
1189 let _ = thread.hndl.join();
1190 }
1191
1192 break;
1193 }
1194 }
1195}
1196
1197
1198impl SyncPeriodicTasks
1199{
1200
1201
1202 pub
1215 fn new(threads_cnt: NonZeroUsize) -> TimerResult<Self>
1216 {
1217 let spti = SharedPeriodicTasks::new()?;
1218 let poll_int = spti.timers_poll.get_poll_interruptor();
1219
1220 let spti = Arc::new(Mutex::new(spti));
1222
1223 let task_injector = Arc::new(Injector::<GlobalTasks>::new());
1224
1225 let mut thread_hndls: Vec<Arc<ThreadHandler>> = Vec::with_capacity(threads_cnt.get());
1226
1227 for i in 0..threads_cnt.get()
1229 {
1230 let handler =
1231 ThreadWorker::new(format!("timer_exec/{}s", i), task_injector.clone(), spti.clone(), poll_int.clone())?;
1232
1233 thread_hndls.push(Arc::new(handler));
1234 }
1235
1236 let thread_hndls = Arc::new(thread_hndls);
1237
1238 let spti_lock = spti.lock().unwrap();
1240 spti_lock.thread_pool.get_or_init(|| thread_hndls.clone());
1241
1242 let thread =
1244 spti_lock
1245 .thread_pool
1246 .get()
1247 .unwrap()
1248 .get(random_range(0..threads_cnt.get()))
1249 .unwrap()
1250 .clone();
1251
1252 drop(spti_lock);
1253
1254 thread.unpark();
1256
1257 let inner =
1258 SyncPeriodicTasksInner
1259 {
1260 poll_int: poll_int,
1261 task_injector: task_injector,
1262 };
1263
1264 return Ok(
1265 Self
1266 {
1267 threads: Some(thread_hndls),
1268 inner: Arc::new(inner),
1269 }
1270 );
1271 }
1272
1273 pub
1287 fn add<T>(&self, task_name: impl Into<String>, task: T, task_time: PeriodicTaskTime) -> TimerResult<PeriodicTaskGuard>
1288 where T: PeriodicTask
1289 {
1290 let task_int: PeriodicTaskHndl = Box::new(task);
1291
1292 let task_name_str: String = task_name.into();
1293
1294 let period_task_guard =
1295 Arc::new(PeriodicTaskGuardInner::new(task_name_str.clone(), task_int)?);
1296
1297
1298 let period_task_ticket =
1299 PeriodicTaskTicket::new(task_name_str.clone(), task_time, Arc::downgrade(&period_task_guard));
1300
1301 let (mpsc_send, mpsc_recv) = mpsc::channel();
1302
1303 self.inner.send_global_cmd(GlobalTasks::AddTask(period_task_ticket, Some(mpsc_send)) )?;
1304
1305 let _ =
1306 mpsc_recv
1307 .recv()
1308 .map_err(|e|
1309 map_timer_err!(TimerErrorType::ExternalError, "mpsc error: {}", e)
1310 )??;
1311
1312 let ret =
1313 PeriodicTaskGuard
1314 {
1315 task_name: task_name_str,
1316 guard: Some(period_task_guard),
1317 spt: self.inner.clone()
1318 };
1319
1320 return Ok(ret);
1321 }
1322
1323 pub
1333 fn check_thread_status(&self) -> Option<String>
1334 {
1335 for thread in self.threads.as_ref().unwrap().iter()
1336 {
1337 if let None = thread.thread_flag.upgrade()
1338 {
1339 return Some(thread.hndl.thread().name().unwrap().to_string());
1340 }
1341 }
1342
1343 return None;
1344 }
1345}
1346
1347#[cfg(test)]
1348mod tests
1349{
1350 use std::{sync::mpsc::{self, RecvTimeoutError, Sender}, time::{Duration, Instant}};
1351
1352 use crate::{periodic_task::sync_tasks::{PeriodicTask, PeriodicTaskResult, PeriodicTaskTime, SyncPeriodicTasks}, AbsoluteTime, RelativeTime};
1353
1354 #[derive(Debug)]
1355 struct TaskStruct1
1356 {
1357 a1: u64,
1358 s: Sender<u64>,
1359 }
1360
1361 impl TaskStruct1
1362 {
1363 fn new(a1: u64, s: Sender<u64>) -> Self
1364 {
1365 return Self{ a1: a1, s };
1366 }
1367 }
1368
1369 impl PeriodicTask for TaskStruct1
1370 {
1371 fn exec(&mut self) -> PeriodicTaskResult
1372 {
1373 println!("taskstruct1 val: {}", self.a1);
1374
1375 let _ = self.s.send(self.a1);
1376
1377 return PeriodicTaskResult::Ok;
1378 }
1379 }
1380
1381 #[derive(Debug)]
1382 struct TaskStruct2
1383 {
1384 a1: u64,
1385 s: Sender<u64>,
1386 }
1387
1388 impl TaskStruct2
1389 {
1390 fn new(a1: u64, s: Sender<u64>) -> Self
1391 {
1392 return Self{ a1: a1, s };
1393 }
1394 }
1395
1396 impl PeriodicTask for TaskStruct2
1397 {
1398 fn exec(&mut self) -> PeriodicTaskResult
1399 {
1400 println!("taskstruct2 val: {}", self.a1);
1401
1402 self.s.send(self.a1).unwrap();
1403
1404 return PeriodicTaskResult::TaskReSchedule(PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(2, 0)));
1405 }
1406 }
1407
1408 #[test]
1409 fn test1_absolute_simple()
1410 {
1411 let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1412
1413 let (send, recv) = mpsc::channel::<u64>();
1414
1415 let task1 = TaskStruct1::new(2, send);
1416 let task1_ptt = PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(3, 0));
1417 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1418
1419 println!("added");
1420 let val = recv.recv();
1421
1422 println!("{:?}", val);
1423
1424 drop(task1_guard);
1425 }
1426
1427 #[test]
1428 fn test1_relative_simple()
1429 {
1430 let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1431
1432 let (send, recv) = mpsc::channel::<u64>();
1433
1434 let task1 = TaskStruct1::new(2, send);
1435 let task1_ptt = PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1436 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1437
1438 let mut s = Instant::now();
1439
1440 for i in 0..3
1441 {
1442 let val = recv.recv().unwrap();
1443
1444 let e = s.elapsed();
1445 s = Instant::now();
1446
1447 println!("{}: {:?} {:?} {}", i, val, e, e.as_micros());
1448
1449 assert!(999000 < e.as_micros() && e.as_micros() < 10001200);
1450 assert_eq!(val, 2);
1451 }
1452
1453 drop(task1_guard);
1454
1455 std::thread::sleep(Duration::from_millis(100));
1456
1457 return;
1458 }
1459
1460 #[test]
1461 fn test1_relative_resched_to_abs()
1462 {
1463 let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1464
1465 let (send, recv) = mpsc::channel::<u64>();
1466
1467 let task1 = TaskStruct2::new(2, send);
1468 let task1_ptt = PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1469 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1470
1471 let s = Instant::now();
1472 match recv.recv_timeout(Duration::from_millis(1150))
1473 {
1474 Ok(rcv_a) =>
1475 {
1476 let e = s.elapsed();
1477 println!("{:?} {}", e, e.as_micros());
1478 assert_eq!(rcv_a, 2);
1479 assert!(999051 < e.as_micros() && e.as_micros() < 1000551);
1480 },
1481 Err(RecvTimeoutError::Timeout) =>
1482 panic!("tineout"),
1483 Err(e) =>
1484 panic!("{}", e),
1485 }
1486
1487 let s = Instant::now();
1488 match recv.recv_timeout(Duration::from_millis(2100))
1489 {
1490 Ok(rcv_a) =>
1491 {
1492 let e = s.elapsed();
1493 println!("{:?} {}", e, e.as_micros());
1494 assert_eq!(rcv_a, 2);
1495 assert!(1999642 < e.as_micros() && e.as_micros() < 2000342);
1496 },
1497 Err(RecvTimeoutError::Timeout) =>
1498 panic!("tineout"),
1499 Err(e) =>
1500 panic!("{}", e),
1501 }
1502
1503 let s = Instant::now();
1504 match recv.recv_timeout(Duration::from_millis(2100))
1505 {
1506 Ok(rcv_a) =>
1507 {
1508 let e = s.elapsed();
1509 println!("{:?} {}", e, e.as_micros());
1510 assert_eq!(rcv_a, 2);
1511 assert!(1999642 < e.as_micros() && e.as_micros() < 2000342);
1512 },
1513 Err(RecvTimeoutError::Timeout) =>
1514 panic!("tineout"),
1515 Err(e) =>
1516 panic!("{}", e),
1517 }
1518
1519 drop(task1_guard);
1520
1521 std::thread::sleep(Duration::from_millis(100));
1522
1523 return;
1524 }
1525
1526 #[test]
1527 fn test1_relative_simple_resched()
1528 {
1529 let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1530
1531 let (send, recv) = mpsc::channel::<u64>();
1532
1533 let task1 = TaskStruct1::new(2, send);
1534 let task1_ptt =
1535 PeriodicTaskTime::interval(
1536 RelativeTime::new_time(1, 0)
1537 );
1538 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1539
1540 let mut s = Instant::now();
1541
1542 for i in 0..3
1543 {
1544 let val = recv.recv().unwrap();
1545
1546 let e = s.elapsed();
1547 s = Instant::now();
1548
1549 println!("{}: {:?} {:?} {}", i, val, e, e.as_micros());
1550
1551 assert!(999000 < e.as_micros() && e.as_micros() < 10001200);
1552 assert_eq!(val, 2);
1553 }
1554
1555 task1_guard
1556 .reschedule_task(
1557 PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(2, 0))
1558 )
1559 .unwrap();
1560
1561 s = Instant::now();
1562 let val = recv.recv().unwrap();
1563 let e = s.elapsed();
1564
1565 println!("resched: {:?} {:?} {}", val, e, e.as_micros());
1566
1567 assert!(1999000 < e.as_micros() && e.as_micros() < 2000560);
1568
1569 let val = recv.recv_timeout(Duration::from_secs(3));
1570
1571 assert_eq!(val.is_err(), true);
1572 assert_eq!(val.err().unwrap(), RecvTimeoutError::Timeout);
1573
1574 drop(task1_guard);
1575
1576 std::thread::sleep(Duration::from_millis(100));
1577
1578 return;
1579 }
1580
1581 #[test]
1582 fn test1_relative_simple_cancel()
1583 {
1584 let s = SyncPeriodicTasks::new(1.try_into().unwrap()).unwrap();
1585
1586 let (send, recv) = mpsc::channel::<u64>();
1587
1588 let task1 = TaskStruct1::new(0, send.clone());
1589 let task1_ptt =
1590 PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1591
1592 let task2 = TaskStruct1::new(1, send.clone());
1593 let task2_ptt =
1594 PeriodicTaskTime::interval(RelativeTime::new_time(2, 0));
1595
1596 let task3 = TaskStruct1::new(2, send);
1597 let task3_ptt =
1598 PeriodicTaskTime::interval(RelativeTime::new_time(0, 500_000_000));
1599
1600 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1601 let task2_guard = s.add("task2", task2, task2_ptt).unwrap();
1602 let task3_guard = s.add("task3", task3, task3_ptt).unwrap();
1603
1604 let mut a_cnt: [u8; 3] = [0_u8; 3];
1605
1606 let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
1607
1608 while AbsoluteTime::now() < end
1609 {
1610 match recv.recv_timeout(Duration::from_millis(1))
1611 {
1612 Ok(rcv_a) =>
1613 a_cnt[rcv_a as usize] += 1,
1614 Err(RecvTimeoutError::Timeout) =>
1615 continue,
1616 Err(e) =>
1617 panic!("{}", e),
1618 }
1619
1620
1621 }
1622
1623 assert_eq!(a_cnt[0], 5);
1624 assert_eq!(a_cnt[1], 2);
1625 assert_eq!(a_cnt[2], 10);
1626
1627 task3_guard.suspend_task().unwrap();
1629
1630
1631 let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
1632
1633 while AbsoluteTime::now() < end
1634 {
1635 match recv.recv_timeout(Duration::from_millis(1))
1636 {
1637 Ok(rcv_a) =>
1638 a_cnt[rcv_a as usize] += 1,
1639 Err(RecvTimeoutError::Timeout) =>
1640 continue,
1641 Err(e) =>
1642 panic!("{}", e),
1643 }
1644
1645
1646 }
1647
1648 assert_eq!(a_cnt[0] > 5, true);
1649 assert_eq!(a_cnt[1] > 2, true);
1650 assert!((a_cnt[2] == 10 || a_cnt[2] == 11));
1651
1652 drop(task1_guard);
1653 drop(task2_guard);
1654 drop(task3_guard);
1655
1656 let end = AbsoluteTime::now() + RelativeTime::new_time(5, 100_000_000);
1657
1658 while AbsoluteTime::now() < end
1659 {
1660 match recv.recv_timeout(Duration::from_millis(1))
1661 {
1662 Ok(rcv_a) =>
1663 a_cnt[rcv_a as usize] += 1,
1664 Err(RecvTimeoutError::Timeout) =>
1665 continue,
1666 Err(_) =>
1667 break,
1668 }
1669
1670
1671 }
1672
1673 assert_eq!(AbsoluteTime::now() < end, true);
1674
1675
1676
1677 return;
1678 }
1679
1680 #[test]
1682 fn test2_multithread_1()
1683 {
1684 let s = SyncPeriodicTasks::new(2.try_into().unwrap()).unwrap();
1685
1686 let (send, recv) = mpsc::channel::<u64>();
1687
1688 let task1 = TaskStruct1::new(0, send.clone());
1689 let task1_ptt =
1690 PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
1691
1692 let task2 = TaskStruct1::new(1, send.clone());
1693 let task2_ptt =
1694 PeriodicTaskTime::interval(RelativeTime::new_time(2, 0));
1695
1696 let task3 = TaskStruct1::new(2, send.clone());
1697 let task3_ptt =
1698 PeriodicTaskTime::interval(RelativeTime::new_time(0, 500_000_000));
1699
1700 let task4 = TaskStruct1::new(3, send.clone());
1701 let task4_ptt =
1702 PeriodicTaskTime::interval(RelativeTime::new_time(0, 200_000_000));
1703
1704 let task5 = TaskStruct1::new(4, send.clone());
1705 let task5_ptt =
1706 PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(5, 0));
1707
1708 let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
1709 let task2_guard = s.add("task2", task2, task2_ptt).unwrap();
1710 let task3_guard = s.add("task3", task3, task3_ptt).unwrap();
1711 let task4_guard = s.add("task4", task4, task4_ptt).unwrap();
1712 let task5_guard = s.add("task5", task5, task5_ptt).unwrap();
1713
1714
1715 let mut a_cnt: [u8; 5] = [0_u8; 5];
1716
1717 let end = AbsoluteTime::now() + RelativeTime::new_time(5, 500_000_000);
1718
1719 while AbsoluteTime::now() < end
1720 {
1721 match recv.recv_timeout(Duration::from_millis(1))
1722 {
1723 Ok(rcv_a) =>
1724 a_cnt[rcv_a as usize] += 1,
1725 Err(RecvTimeoutError::Timeout) =>
1726 continue,
1727 Err(e) =>
1728 panic!("{}", e),
1729 }
1730
1731
1732 }
1733
1734 println!("{:?}", a_cnt);
1735
1736 assert!(a_cnt[0] == 5);
1737 assert!(a_cnt[1] == 2);
1738 assert!((a_cnt[2] == 10 || a_cnt[2] == 11));
1739 assert!(a_cnt[3] == 27);
1740 assert!(a_cnt[4] == 1);
1741
1742 task5_guard.reschedule_task(PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(0, 500_000_000))).unwrap();
1743
1744 let end = AbsoluteTime::now() + RelativeTime::new_time(0, 600_000_000);
1745
1746 while AbsoluteTime::now() < end
1747 {
1748 match recv.recv_timeout(Duration::from_millis(1))
1749 {
1750 Ok(rcv_a) =>
1751 a_cnt[rcv_a as usize] += 1,
1752 Err(RecvTimeoutError::Timeout) =>
1753 continue,
1754 Err(e) =>
1755 panic!("{}", e),
1756 }
1757 }
1758
1759 println!("{:?}", a_cnt);
1760 assert!(a_cnt[4] == 2);
1761
1762 drop(task5_guard);
1763 drop(task4_guard);
1764 drop(task3_guard);
1765 drop(task2_guard);
1766
1767 let end = AbsoluteTime::now() + RelativeTime::new_time(2, 1000);
1768
1769 while AbsoluteTime::now() < end
1770 {
1771 match recv.recv_timeout(Duration::from_millis(1))
1772 {
1773 Ok(rcv_a) =>
1774 a_cnt[rcv_a as usize] += 1,
1775 Err(RecvTimeoutError::Timeout) =>
1776 continue,
1777 Err(e) =>
1778 panic!("{}", e),
1779 }
1780 }
1781
1782
1783 println!("{:?}", a_cnt);
1784 assert_eq!(a_cnt[4], 2);
1785 assert_eq!(a_cnt[0], 8);
1786
1787 drop(task1_guard);
1788
1789 std::thread::sleep(Duration::from_millis(10));
1790 return;
1791 }
1792}