1use std::fmt;
16
17use crate::
18{
19 common,
20 error::{TimerErrorType, TimerResult},
21 map_timer_err,
22 timer::{OrderedTimerDeque, OrderedTimerDequeIntrf},
23 timer_err,
24 timer_portable::timer::TimerExpMode, TimerReadRes
25};
26
27pub trait TimerDequeueSignal
33{
34 type TimerQueueID: Eq + PartialEq + fmt::Display + fmt::Debug;
36
37 type TimeoutErr: fmt::Display + fmt::Debug;
39
40 fn get_id(&self) -> Self::TimerQueueID;
42
43 fn sig_timeout(self) -> Result<(), Self::TimeoutErr>
46 where Self: Sized
47 {
48 return Ok(());
49 }
50
51 async fn a_sig_timeout(self) -> Result<(), Self::TimeoutErr>
54 where Self: Sized
55 {
56 return Ok(());
57 }
58}
59
60#[derive(Debug)]
68pub struct TimerDequeueSignalTicket<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq>
69{
70 signal: SIG,
72
73 timeout_absolute: TimerExpMode,
75}
76
77impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> TimerDequeueSignalTicket<SIG>
78{
79 pub
80 fn new(sig_hnd: SIG, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<Self>
81 {
82 let abs_time_nsec = abs_time_nsec & 999_999_999;
83
84 let cur = common::get_current_timestamp();
85 let req = TimerExpMode::OneShot { sec: abs_time_sec, nsec: abs_time_nsec };
86
87 if TimerExpMode::from(cur) > req
88 {
89 timer_err!(TimerErrorType::Expired, "time already expired");
90 }
91
92
93 return Ok(
94 Self
95 {
96 signal:
97 sig_hnd,
98 timeout_absolute:
99 req
100 }
101 );
102 }
103
104 fn get_time_until(&self) -> TimerExpMode
105 {
106 return self.timeout_absolute;
107 }
108
109 fn send_sig_timeout(self) -> TimerResult<()>
110 {
111 return
112 self
113 .signal
114 .sig_timeout()
115 .map_err(|e|
116 map_timer_err!(TimerErrorType::ExternalError, "cannot send signal, error: {}", e)
117 );
118 }
119
120 async
121 fn async_send_sig_timeout(self) -> TimerResult<()>
122 {
123 return
124 self
125 .signal
126 .a_sig_timeout()
127 .await
128 .map_err(|e|
129 map_timer_err!(TimerErrorType::ExternalError, "cannot send signal, error: {}", e)
130 );
131 }
132}
133
134impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> fmt::Display for TimerDequeueSignalTicket<SIG>
135{
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error>
137 {
138 write!(f, "{} until: {}", self.signal.get_id(), self.timeout_absolute)
139 }
140}
141
142impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> Eq for TimerDequeueSignalTicket<SIG> {}
143
144impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> PartialEq for TimerDequeueSignalTicket<SIG>
145{
146 fn eq(&self, other: &Self) -> bool
147 {
148 return self.signal == other.signal;
149 }
150}
151
152
153impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> Ord for TimerDequeueSignalTicket<SIG>
154{
155 fn cmp(&self, other: &Self) -> std::cmp::Ordering
156 {
157 return self.timeout_absolute.cmp(&other.timeout_absolute);
158 }
159}
160
161impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> PartialOrd for TimerDequeueSignalTicket<SIG>
162{
163 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
164 {
165 return Some(self.cmp(other));
166 }
167}
168
169impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> OrderedTimerDequeIntrf for TimerDequeueSignalTicket<SIG>
170{
171 type Target = SIG;
172 type Ticket = common::NoTicket;
173
174 #[inline]
175 fn wrap(target: Self::Target, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, Self::Ticket)>
176 {
177 return Ok((Self::new(target, abs_time_sec, abs_time_nsec)?, common::NoTicket));
178 }
179
180 #[inline]
181 fn get_timeout_absolute(&self) -> TimerExpMode
182 {
183 return self.timeout_absolute;
184 }
185}
186
187impl<SIG> OrderedTimerDeque<TimerDequeueSignalTicket<SIG>>
188where SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq
189{
190 #[inline]
209 pub
210 fn add_to_timer(&mut self, sig_hnd: SIG, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<common::NoTicket>
211 {
212 return self.add_to_timer_local(sig_hnd, abs_time_sec, abs_time_nsec);
213 }
214
215 pub
229 fn remove_from_sched_queue(&mut self, arg_uniq_id: &SIG::TimerQueueID) -> TimerResult<Option<()>>
230 {
231 if self.deque_timeout_list.len() == 0
232 {
233 timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
234 }
235 else
236 {
237 if self.deque_timeout_list.len() == 1
239 {
240 let _ = self.deque_timeout_list.pop_front().unwrap();
242
243 self.stop_timer()?;
245
246 return Ok(Some(()));
247 }
248 else
249 {
250 for (pos, q_item)
254 in self.deque_timeout_list.iter().enumerate()
255 {
256 if &q_item.signal.get_id() == arg_uniq_id
257 {
258 let _ =
260 self.deque_timeout_list.remove(pos);
261
262 if pos == 0
264 {
265 self.reschedule_timer()?;
266 }
267
268 return Ok(Some(()));
269 }
270 }
271
272 return Ok(None);
273 }
274 }
275 }
276
277 pub
292 fn timeout_event_handler(&mut self, res: TimerReadRes<u64>) -> TimerResult<()>
293 {
294 if let TimerReadRes::WouldBlock = res
296 {
297 return Ok(());
298 }
299
300 let force_timer_resched =
302 TimerReadRes::Cancelled == res || TimerReadRes::Ok(0) == res;
303
304 let mut error: TimerResult<()> = Ok(());
305 let mut last_time_until: TimerExpMode = TimerExpMode::None;
306
307 let cur_timestamp = common::get_current_timestamp();
308
309 loop
310 {
311 let Some(front_entity) =
313 self.deque_timeout_list.front()
314 else
315 {
316 self.stop_timer()?;
317
318 return Ok(());
319 };
320
321 let time_until = front_entity.get_time_until();
322
323 if time_until <= TimerExpMode::from(cur_timestamp)
324 {
325 let front_entity = self.deque_timeout_list.pop_front().unwrap();
326
327 error = front_entity.send_sig_timeout();
329 }
330 else
331 {
332 if last_time_until != TimerExpMode::None || force_timer_resched == true
334 {
335 self.reschedule_timer()?;
337 }
338
339 break;
340 }
341
342 last_time_until = time_until;
343 } return error;
347 }
348
349 pub async
366 fn async_timeout_event_handler(&mut self, res: TimerReadRes<u64>) -> TimerResult<()>
367 {
368 if let TimerReadRes::WouldBlock = res
370 {
371 return Ok(());
372 }
373
374 let force_timer_resched =
376 TimerReadRes::Cancelled == res || TimerReadRes::Ok(0) == res;
377
378 let mut error: TimerResult<()> = Ok(());
379 let mut last_time_until: TimerExpMode = TimerExpMode::None;
380
381 let cur_timestamp = common::get_current_timestamp();
382
383 loop
384 {
385 let Some(front_entity) =
387 self.deque_timeout_list.front()
388 else
389 {
390 self.stop_timer()?;
391
392 return Ok(());
393 };
394
395 let time_until = front_entity.get_time_until();
396
397 if time_until <= TimerExpMode::from(cur_timestamp)
398 {
399 let front_entity = self.deque_timeout_list.pop_front().unwrap();
400
401 error = front_entity.async_send_sig_timeout().await;
403 }
404 else
405 {
406 if last_time_until != TimerExpMode::None || force_timer_resched == true
408 {
409 self.reschedule_timer()?;
411 }
412
413 break;
414 }
415
416 last_time_until = time_until;
417 } return error;
421 }
422
423}
424
425
426#[cfg(test)]
427mod tests
428{
429 use std::{collections::VecDeque, fmt, sync::mpsc::{self, SendError, Sender}, time::{Duration, Instant}};
430
431 use crate::{common, timer::OrderedTimerDeque, timer_portable::timer::TimerReadRes, timer_signal::{TimerDequeueSignal, TimerDequeueSignalTicket}};
432
433 #[test]
434 fn test_timer_test()
435 {
436 #[derive(Debug)]
437 struct TestSigStruct
438 {
439 uniq_id: u64,
440 mpsc_sender: Sender<u64>,
441 }
442
443 impl Eq for TestSigStruct{}
444
445 impl PartialEq for TestSigStruct
446 {
447 fn eq(&self, other: &Self) -> bool
448 {
449 return self.uniq_id == other.uniq_id;
450 }
451 }
452
453 impl fmt::Display for TestSigStruct
454 {
455 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
456 {
457 write!(f, "sig handler timer id: {}", self.uniq_id)
458 }
459 }
460
461 impl TimerDequeueSignal for TestSigStruct
462 {
463 type TimerQueueID = u64;
464
465 type TimeoutErr = SendError<Self::TimerQueueID>;
466
467 fn get_id(&self) -> Self::TimerQueueID
468 {
469 return self.uniq_id;
470 }
471
472 fn sig_timeout(self) -> Result<(), Self::TimeoutErr>
473 {
474 return self.mpsc_sender.send(self.uniq_id);
475 }
476 }
477
478 let mut time_list =
479 OrderedTimerDeque
480 ::<TimerDequeueSignalTicket<TestSigStruct>>
481 ::new("test_label".into(), 4, false).unwrap();
482
483
484 let s = Instant::now();
485
486 let tss_set = common::get_current_timestamp().timestamp()+2;
487
488 let (snd, rcv) = mpsc::channel::<u64>();
489
490 let sig_tm = TestSigStruct{ uniq_id: 456, mpsc_sender: snd };
491
492 println!("signal handler: {}", sig_tm);
493
494 let _ = time_list.add_to_timer(sig_tm, tss_set, 0).unwrap();
495
496 for _ in 0..3
497 {
498 let event = time_list.wait_for_event().unwrap();
499 if let TimerReadRes::WouldBlock = event
500 {
501 std::thread::sleep(Duration::from_millis(1001));
502
503 continue;
504 }
505
506 assert_eq!(event, TimerReadRes::Ok(1));
507
508 let e = s.elapsed();
509 let ts = common::get_current_timestamp();
510
511 assert_eq!(ts.timestamp(), tss_set);
512 println!("instant: {:?} ts: {}, timerset: {}", e, ts.timestamp(), tss_set);
513
514 time_list.timeout_event_handler(event).unwrap();
515
516 let timer_id = rcv.recv_timeout(Duration::from_secs(1)).unwrap();
517
518 assert_eq!(timer_id, 456);
519
520 return;
521 }
522
523 panic!("timeout befor timer!");
524 }
525
526 #[test]
527 fn test_pop()
528 {
529 let mut ve = VecDeque::<[u8; 32]>::with_capacity(5);
530
531 ve.push_back([0_u8; 32]);
532 ve.push_back([1_u8; 32]);
533 ve.push_back([2_u8; 32]);
534 ve.push_back([3_u8; 32]);
535
536 let mut small: Option<Duration> = None;
537 let mut large: Option<Duration> = None;
538 let mut some_cnt: u32 = 0;
539 let mut mean: u128 = 0;
540
541 for _ in 0..5000
542 {
543 let s = Instant::now();
544
545 let v = ve.pop_front().unwrap();
546
547 if v.len() == 32
548 {
549 some_cnt += 1;
550 }
551
552 ve.push_front(v);
553
554 let e = s.elapsed();
555
556 if small.is_none() == true
557 {
558 small = Some(e);
559 }
560 else if small.as_ref().unwrap() > &e
561 {
562 small = Some(e);
563 }
564
565 if large.is_none() == true
566 {
567 large = Some(e);
568 }
569 else if large.as_ref().unwrap() < &e
570 {
571 large = Some(e);
572 }
573
574 mean += e.as_nanos();
575 }
576
577 println!("s: {:?}, e: {:?}, mean: {} some_cnt: {}", small.unwrap(), large.unwrap(), mean/100, some_cnt);
578 }
579
580 #[test]
581 fn test_pop2()
582 {
583 let mut ve = VecDeque::<[u8; 32]>::with_capacity(5);
584
585 ve.push_back([0_u8; 32]);
586 ve.push_back([1_u8; 32]);
587 ve.push_back([2_u8; 32]);
588 ve.push_back([3_u8; 32]);
589
590 let mut small: Option<Duration> = None;
591 let mut large: Option<Duration> = None;
592 let mut some_cnt: u32 = 0;
593 let mut mean: u128 = 0;
594
595 for _ in 0..5000
596 {
597 let s = Instant::now();
598
599 let v = ve.front().unwrap();
600
601 if v.len() == 32
602 {
603 some_cnt += 1;
604 }
605
606 let v = ve.pop_front().unwrap();
607
608
609 let e = s.elapsed();
610 ve.push_front(v);
611
612 if small.is_none() == true
613 {
614 small = Some(e);
615 }
616 else if small.as_ref().unwrap() > &e
617 {
618 small = Some(e);
619 }
620
621 if large.is_none() == true
622 {
623 large = Some(e);
624 }
625 else if large.as_ref().unwrap() < &e
626 {
627 large = Some(e);
628 }
629
630 mean += e.as_nanos();
631 }
632
633 println!("s: {:?}, e: {:?}, mean: {} some_cnt: {}", small.unwrap(), large.unwrap(), mean/100, some_cnt);
634 }
635}