1use std::fmt;
16
17use crate::
18{
19 common,
20 error::{TimerErrorType, TimerResult},
21 map_timer_err,
22 timer::{OrderedTimerDeque, OrderedTimerDequeIntrf},
23 timer_err,
24 timer_portable::timer::TimerExpMode, TimerReadRes
25};
26
27pub trait TimerDequeueSignal
33{
34 type TimerQueueID: Eq + PartialEq + fmt::Display + fmt::Debug;
36
37 type TimeoutErr: fmt::Display + fmt::Debug;
39
40 fn get_id(&self) -> Self::TimerQueueID;
42
43 fn sig_timeout(self) -> Result<(), Self::TimeoutErr>
46 where Self: Sized
47 {
48 return Ok(());
49 }
50
51 async fn a_sig_timeout(self) -> Result<(), Self::TimeoutErr>
54 where Self: Sized
55 {
56 return Ok(());
57 }
58}
59
60#[derive(Debug)]
77pub struct TimerDequeueSignalTicket<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq>
78{
79 signal: SIG,
81
82 timeout_absolute: TimerExpMode,
84}
85
86impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> TimerDequeueSignalTicket<SIG>
87{
88 pub
89 fn new(sig_hnd: SIG, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<Self>
90 {
91 let abs_time_nsec = abs_time_nsec & 999_999_999;
92
93 let cur = common::get_current_timestamp();
94 let req = TimerExpMode::OneShot { sec: abs_time_sec, nsec: abs_time_nsec };
95
96 if TimerExpMode::from(cur) > req
97 {
98 timer_err!(TimerErrorType::Expired, "time already expired");
99 }
100
101
102 return Ok(
103 Self
104 {
105 signal:
106 sig_hnd,
107 timeout_absolute:
108 req
109 }
110 );
111 }
112
113 fn get_time_until(&self) -> TimerExpMode
114 {
115 return self.timeout_absolute;
116 }
117
118 fn send_sig_timeout(self) -> TimerResult<()>
119 {
120 return
121 self
122 .signal
123 .sig_timeout()
124 .map_err(|e|
125 map_timer_err!(TimerErrorType::ExternalError, "cannot send signal, error: {}", e)
126 );
127 }
128
129 async
130 fn async_send_sig_timeout(self) -> TimerResult<()>
131 {
132 return
133 self
134 .signal
135 .a_sig_timeout()
136 .await
137 .map_err(|e|
138 map_timer_err!(TimerErrorType::ExternalError, "cannot send signal, error: {}", e)
139 );
140 }
141}
142
143impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> fmt::Display for TimerDequeueSignalTicket<SIG>
144{
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error>
146 {
147 write!(f, "{} until: {}", self.signal.get_id(), self.timeout_absolute)
148 }
149}
150
151impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> Eq for TimerDequeueSignalTicket<SIG> {}
152
153impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> PartialEq for TimerDequeueSignalTicket<SIG>
154{
155 fn eq(&self, other: &Self) -> bool
156 {
157 return self.signal == other.signal;
158 }
159}
160
161
162impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> Ord for TimerDequeueSignalTicket<SIG>
163{
164 fn cmp(&self, other: &Self) -> std::cmp::Ordering
165 {
166 return self.timeout_absolute.cmp(&other.timeout_absolute);
167 }
168}
169
170impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> PartialOrd for TimerDequeueSignalTicket<SIG>
171{
172 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
173 {
174 return Some(self.cmp(other));
175 }
176}
177
178impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> OrderedTimerDequeIntrf for TimerDequeueSignalTicket<SIG>
179{
180 type Target = SIG;
181 type Ticket = common::NoTicket;
182
183 #[inline]
184 fn wrap(target: Self::Target, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, Self::Ticket)>
185 {
186 return Ok((Self::new(target, abs_time_sec, abs_time_nsec)?, common::NoTicket));
187 }
188
189 #[inline]
190 fn get_timeout_absolute(&self) -> TimerExpMode
191 {
192 return self.timeout_absolute;
193 }
194}
195
196impl<SIG> OrderedTimerDeque<TimerDequeueSignalTicket<SIG>>
197where SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq
198{
199 #[inline]
218 pub
219 fn add_to_timer(&mut self, sig_hnd: SIG, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<common::NoTicket>
220 {
221 return self.add_to_timer_local(sig_hnd, abs_time_sec, abs_time_nsec);
222 }
223
224 pub
238 fn remove_from_sched_queue(&mut self, arg_uniq_id: &SIG::TimerQueueID) -> TimerResult<Option<()>>
239 {
240 if self.deque_timeout_list.len() == 0
241 {
242 timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
243 }
244 else
245 {
246 if self.deque_timeout_list.len() == 1
248 {
249 let _ = self.deque_timeout_list.pop_front().unwrap();
251
252 self.stop_timer()?;
254
255 return Ok(Some(()));
256 }
257 else
258 {
259 for (pos, q_item)
263 in self.deque_timeout_list.iter().enumerate()
264 {
265 if &q_item.signal.get_id() == arg_uniq_id
266 {
267 let _ =
269 self.deque_timeout_list.remove(pos);
270
271 if pos == 0
273 {
274 self.reschedule_timer()?;
275 }
276
277 return Ok(Some(()));
278 }
279 }
280
281 return Ok(None);
282 }
283 }
284 }
285
286 pub
301 fn timeout_event_handler(&mut self, res: TimerReadRes<u64>) -> TimerResult<()>
302 {
303 if let TimerReadRes::WouldBlock = res
305 {
306 return Ok(());
307 }
308
309 let force_timer_resched =
311 TimerReadRes::Cancelled == res || TimerReadRes::Ok(0) == res;
312
313 let mut error: TimerResult<()> = Ok(());
314 let mut last_time_until: TimerExpMode = TimerExpMode::None;
315
316 let cur_timestamp = common::get_current_timestamp();
317
318 loop
319 {
320 let Some(front_entity) =
322 self.deque_timeout_list.front()
323 else
324 {
325 self.stop_timer()?;
326
327 return Ok(());
328 };
329
330 let time_until = front_entity.get_time_until();
331
332 if time_until <= TimerExpMode::from(cur_timestamp)
333 {
334 let front_entity = self.deque_timeout_list.pop_front().unwrap();
335
336 error = front_entity.send_sig_timeout();
338 }
339 else
340 {
341 if last_time_until != TimerExpMode::None || force_timer_resched == true
343 {
344 self.reschedule_timer()?;
346 }
347
348 break;
349 }
350
351 last_time_until = time_until;
352 } return error;
356 }
357
358 pub async
375 fn async_timeout_event_handler(&mut self, res: TimerReadRes<u64>) -> TimerResult<()>
376 {
377 if let TimerReadRes::WouldBlock = res
379 {
380 return Ok(());
381 }
382
383 let force_timer_resched =
385 TimerReadRes::Cancelled == res || TimerReadRes::Ok(0) == res;
386
387 let mut error: TimerResult<()> = Ok(());
388 let mut last_time_until: TimerExpMode = TimerExpMode::None;
389
390 let cur_timestamp = common::get_current_timestamp();
391
392 loop
393 {
394 let Some(front_entity) =
396 self.deque_timeout_list.front()
397 else
398 {
399 self.stop_timer()?;
400
401 return Ok(());
402 };
403
404 let time_until = front_entity.get_time_until();
405
406 if time_until <= TimerExpMode::from(cur_timestamp)
407 {
408 let front_entity = self.deque_timeout_list.pop_front().unwrap();
409
410 error = front_entity.async_send_sig_timeout().await;
412 }
413 else
414 {
415 if last_time_until != TimerExpMode::None || force_timer_resched == true
417 {
418 self.reschedule_timer()?;
420 }
421
422 break;
423 }
424
425 last_time_until = time_until;
426 } return error;
430 }
431
432}
433
434
435#[cfg(test)]
436mod tests
437{
438 use std::{collections::VecDeque, fmt, sync::mpsc::{self, SendError, Sender}, time::{Duration, Instant}};
439
440 use crate::{common, timer::OrderedTimerDeque, timer_portable::timer::TimerReadRes, timer_signal::{TimerDequeueSignal, TimerDequeueSignalTicket}};
441
442 #[test]
443 fn test_timer_test()
444 {
445 #[derive(Debug)]
446 struct TestSigStruct
447 {
448 uniq_id: u64,
449 mpsc_sender: Sender<u64>,
450 }
451
452 impl Eq for TestSigStruct{}
453
454 impl PartialEq for TestSigStruct
455 {
456 fn eq(&self, other: &Self) -> bool
457 {
458 return self.uniq_id == other.uniq_id;
459 }
460 }
461
462 impl fmt::Display for TestSigStruct
463 {
464 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
465 {
466 write!(f, "sig handler timer id: {}", self.uniq_id)
467 }
468 }
469
470 impl TimerDequeueSignal for TestSigStruct
471 {
472 type TimerQueueID = u64;
473
474 type TimeoutErr = SendError<Self::TimerQueueID>;
475
476 fn get_id(&self) -> Self::TimerQueueID
477 {
478 return self.uniq_id;
479 }
480
481 fn sig_timeout(self) -> Result<(), Self::TimeoutErr>
482 {
483 return self.mpsc_sender.send(self.uniq_id);
484 }
485 }
486
487 let mut time_list =
488 OrderedTimerDeque
489 ::<TimerDequeueSignalTicket<TestSigStruct>>
490 ::new("test_label".into(), 4, false).unwrap();
491
492
493 let s = Instant::now();
494
495 let tss_set = common::get_current_timestamp().timestamp()+2;
496
497 let (snd, rcv) = mpsc::channel::<u64>();
498
499 let sig_tm = TestSigStruct{ uniq_id: 456, mpsc_sender: snd };
500
501 println!("signal handler: {}", sig_tm);
502
503 let _ = time_list.add_to_timer(sig_tm, tss_set, 0).unwrap();
504
505 for _ in 0..3
506 {
507 let event = time_list.wait_for_event().unwrap();
508 if let TimerReadRes::WouldBlock = event
509 {
510 std::thread::sleep(Duration::from_millis(1001));
511
512 continue;
513 }
514
515 assert_eq!(event, TimerReadRes::Ok(1));
516
517 let e = s.elapsed();
518 let ts = common::get_current_timestamp();
519
520 assert_eq!(ts.timestamp(), tss_set);
521 println!("instant: {:?} ts: {}, timerset: {}", e, ts.timestamp(), tss_set);
522
523 time_list.timeout_event_handler(event).unwrap();
524
525 let timer_id = rcv.recv_timeout(Duration::from_secs(1)).unwrap();
526
527 assert_eq!(timer_id, 456);
528
529 return;
530 }
531
532 panic!("timeout befor timer!");
533 }
534
535 #[test]
536 fn test_pop()
537 {
538 let mut ve = VecDeque::<[u8; 32]>::with_capacity(5);
539
540 ve.push_back([0_u8; 32]);
541 ve.push_back([1_u8; 32]);
542 ve.push_back([2_u8; 32]);
543 ve.push_back([3_u8; 32]);
544
545 let mut small: Option<Duration> = None;
546 let mut large: Option<Duration> = None;
547 let mut some_cnt: u32 = 0;
548 let mut mean: u128 = 0;
549
550 for _ in 0..5000
551 {
552 let s = Instant::now();
553
554 let v = ve.pop_front().unwrap();
555
556 if v.len() == 32
557 {
558 some_cnt += 1;
559 }
560
561 ve.push_front(v);
562
563 let e = s.elapsed();
564
565 if small.is_none() == true
566 {
567 small = Some(e);
568 }
569 else if small.as_ref().unwrap() > &e
570 {
571 small = Some(e);
572 }
573
574 if large.is_none() == true
575 {
576 large = Some(e);
577 }
578 else if large.as_ref().unwrap() < &e
579 {
580 large = Some(e);
581 }
582
583 mean += e.as_nanos();
584 }
585
586 println!("s: {:?}, e: {:?}, mean: {} some_cnt: {}", small.unwrap(), large.unwrap(), mean/100, some_cnt);
587 }
588
589 #[test]
590 fn test_pop2()
591 {
592 let mut ve = VecDeque::<[u8; 32]>::with_capacity(5);
593
594 ve.push_back([0_u8; 32]);
595 ve.push_back([1_u8; 32]);
596 ve.push_back([2_u8; 32]);
597 ve.push_back([3_u8; 32]);
598
599 let mut small: Option<Duration> = None;
600 let mut large: Option<Duration> = None;
601 let mut some_cnt: u32 = 0;
602 let mut mean: u128 = 0;
603
604 for _ in 0..5000
605 {
606 let s = Instant::now();
607
608 let v = ve.front().unwrap();
609
610 if v.len() == 32
611 {
612 some_cnt += 1;
613 }
614
615 let v = ve.pop_front().unwrap();
616
617
618 let e = s.elapsed();
619 ve.push_front(v);
620
621 if small.is_none() == true
622 {
623 small = Some(e);
624 }
625 else if small.as_ref().unwrap() > &e
626 {
627 small = Some(e);
628 }
629
630 if large.is_none() == true
631 {
632 large = Some(e);
633 }
634 else if large.as_ref().unwrap() < &e
635 {
636 large = Some(e);
637 }
638
639 mean += e.as_nanos();
640 }
641
642 println!("s: {:?}, e: {:?}, mean: {} some_cnt: {}", small.unwrap(), large.unwrap(), mean/100, some_cnt);
643 }
644}