timer_deque_rs/deque_timeout/
timer_consumer.rs1use std::fmt;
17
18use crate::deque_timeout::{OrderdTimerDequeOnce, OrderdTimerDequePeriodic, OrderedTimerDequeMode};
19use crate::error::{TimerErrorType, TimerResult};
20use crate::{common, timer_err, TimerReadRes};
21use crate::timer_portable::timer::{AbsoluteTime, RelativeTime};
22
23use super::{OrderedTimerDeque, OrderedTimerDequeIntrf};
24
25#[derive(Debug)]
61pub struct TimerDequeueConsumer<R, MODE>
62where
63 R: PartialEq + Eq + fmt::Debug + fmt::Display + Send,
64 MODE: OrderedTimerDequeMode
65{
66 target: R,
68
69 timeout_mode: MODE,
71}
72
73impl<R, MODE> fmt::Display
74for TimerDequeueConsumer<R, MODE>
75where
76 R: PartialEq + Eq + fmt::Display + fmt::Debug + Send,
77 MODE: OrderedTimerDequeMode
78{
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error>
80 {
81 write!(f, "{} until: {}", self.target, self.timeout_mode)
82 }
83}
84
85impl<R, MODE> TimerDequeueConsumer<R, MODE>
86where
87 R: PartialEq + Eq + fmt::Display + fmt::Debug + Send,
88 MODE: OrderedTimerDequeMode
89{
90 fn into_inner(self) -> R
91 {
92 return self.target;
93 }
94}
95
96
97impl<R> TimerDequeueConsumer<R, OrderdTimerDequeOnce>
98where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send
99{
100 fn new(target: R, abs_time: AbsoluteTime) -> TimerResult<Self>
101 {
102 let cur = AbsoluteTime::now();
103
104 if cur > abs_time
105 {
106 timer_err!(TimerErrorType::Expired,
107 "time already expired now: {}, req: {}", cur, abs_time);
108 }
109
110 return Ok(
111 Self
112 {
113 target:
114 target,
115 timeout_mode:
116 OrderdTimerDequeOnce::new(abs_time),
117 }
118 );
119 }
120}
121
122impl<R> TimerDequeueConsumer<R, OrderdTimerDequePeriodic>
123where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send + Clone
124{
125 fn new(target: R, rel_time: RelativeTime) -> TimerResult<Self>
126 {
127 return Ok(
128 Self
129 {
130 target:
131 target,
132 timeout_mode:
133 OrderdTimerDequePeriodic::new(rel_time),
134 }
135 );
136 }
137
138 fn clone_inner(&self) -> R
139 {
140 return self.target.clone();
141 }
142}
143
144impl<R, MODE> Eq for TimerDequeueConsumer<R, MODE>
145where
146 R: PartialEq + Eq + fmt::Display + fmt::Debug + Send,
147 MODE: OrderedTimerDequeMode
148{
149
150}
151
152impl<R, MODE> PartialEq for TimerDequeueConsumer<R, MODE>
153where
154 R: PartialEq + Eq + fmt::Display + fmt::Debug + Send,
155 MODE: OrderedTimerDequeMode
156{
157 fn eq(&self, other: &Self) -> bool
158 {
159 return self.target == other.target;
160 }
161}
162
163impl<R, MODE> PartialEq<R> for TimerDequeueConsumer<R, MODE>
164where
165 R: PartialEq + Eq + fmt::Display + fmt::Debug + Send,
166 MODE: OrderedTimerDequeMode
167{
168 fn eq(&self, other: &R) -> bool
169 {
170 return &self.target == other;
171 }
172}
173
174impl<R, MODE> Ord for TimerDequeueConsumer<R, MODE>
175where
176 R: PartialEq + Eq + fmt::Display + fmt::Debug + Send,
177 MODE: OrderedTimerDequeMode
178{
179 fn cmp(&self, other: &Self) -> std::cmp::Ordering
180 {
181 return self.timeout_mode.cmp(&other.timeout_mode);
182 }
183}
184
185impl<R, MODE> PartialOrd for TimerDequeueConsumer<R, MODE>
186where
187 R: PartialEq + Eq + fmt::Display + fmt::Debug + Send,
188 MODE: OrderedTimerDequeMode
189{
190 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
191 {
192 return Some(self.cmp(other));
193 }
194}
195
196impl<R, MODE> OrderedTimerDequeIntrf for TimerDequeueConsumer<R, MODE>
197where
198 R: PartialEq + Eq + fmt::Display + fmt::Debug + Send,
199 MODE: OrderedTimerDequeMode
200{
201 type Target = R;
203
204 type Ticket = common::NoTicket;
206
207 #[inline]
208 fn get_timeout_absolute(&self) -> AbsoluteTime
209 {
210 return self.timeout_mode.get_absolut_timeout();
211 }
212}
213
214impl<R> OrderedTimerDeque<TimerDequeueConsumer<R, OrderdTimerDequeOnce>>
215where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send
216{
217 pub
236 fn add_to_timer(&mut self, entity: R, abs_time: AbsoluteTime) -> TimerResult<common::NoTicket>
237 {
238 let inst =
239 TimerDequeueConsumer::<R, OrderdTimerDequeOnce>::new(entity, abs_time)?;
240
241 return self.add_to_timer_local(inst).map(|_| common::NoTicket);
242 }
243
244 pub
259 fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<R>) -> TimerResult<()>
260 {
261 if let TimerReadRes::WouldBlock = res
263 {
264 return Ok(());
265 }
266
267 let cur_timestamp = AbsoluteTime::now();
268
269 loop
270 {
271 let Some(front_entity) =
273 self.deque_timeout_list.front()
274 else
275 {
276 break;
277 };
278
279 let time_until = front_entity.get_timeout_absolute();
280
281 if time_until <= cur_timestamp
282 {
283 timeout_items.push(self.deque_timeout_list.pop_front().unwrap().into_inner());
284 }
285 else
286 {
287 break;
288 }
289 }
290
291 self.reschedule_timer()?;
293
294 return Ok(());
295 }
296
297}
298
299impl<R> OrderedTimerDeque<TimerDequeueConsumer<R, OrderdTimerDequePeriodic>>
300where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send + Clone
301{
302 pub
323 fn add_to_timer(&mut self, entity: R, rel_time: RelativeTime) -> TimerResult<common::NoTicket>
324 {
325 let inst =
326 TimerDequeueConsumer::<R, OrderdTimerDequePeriodic>::new(entity, rel_time)?;
327
328 return self.add_to_timer_local(inst).map(|_| common::NoTicket);
329 }
330
331
332
333 pub
348 fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<R>) -> TimerResult<()>
349 {
350 if let TimerReadRes::WouldBlock = res
352 {
353 return Ok(());
354 }
355
356 let cur_timestamp = AbsoluteTime::now();
357
358 loop
359 {
360 let Some(front_entity) =
362 self.deque_timeout_list.front()
363 else
364 {
365 break;
366 };
367
368 let time_until = front_entity.get_timeout_absolute();
369
370 if time_until <= cur_timestamp
371 {
372 let mut deq = self.deque_timeout_list.pop_front().unwrap();
373
374 timeout_items.push(deq.clone_inner());
375
376 deq.timeout_mode.advance_timeout();
378
379 self.add_to_timer_local(deq)?;
381 }
382 else
383 {
384 break;
385 }
386 }
387
388 self.reschedule_timer()?;
390
391 return Ok(());
392 }
393
394}
395
396impl<R, MODE> OrderedTimerDeque<TimerDequeueConsumer<R, MODE>>
397where
398 R: PartialEq + Eq + fmt::Display + fmt::Debug + Send,
399 MODE: OrderedTimerDequeMode
400{
401 pub
418 fn remove_from_sched_queue(&mut self, entity: &R) -> TimerResult<Option<R>>
419 {
420 if self.deque_timeout_list.len() == 0
421 {
422 timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
423 }
424 else
425 {
426 if self.deque_timeout_list.len() == 1
428 {
429 let ret_ent = self.deque_timeout_list.pop_front().unwrap();
431
432 self.stop_timer()?;
434
435 return Ok(Some(ret_ent.into_inner()));
436 }
437 else
438 {
439 for (pos, q_item)
443 in self.deque_timeout_list.iter().enumerate()
444 {
445 if q_item == entity
446 {
447 let ret_ent =
449 self.deque_timeout_list.remove(pos).unwrap().into_inner();
450
451 if pos == 0
453 {
454 self.reschedule_timer()?;
455 }
456
457 return Ok(Some(ret_ent));
458 }
459 }
460
461 return Ok(None);
462 }
463 }
464 }
465}
466
467#[cfg(test)]
468mod tests
469{
470 use std::{cmp::Ordering, fmt, time::{Duration, Instant}};
471
472 use crate::{common, deque_timeout::OrderdTimerDequeOnce, timer_portable::timer::{AbsoluteTime, TimerReadRes}, OrderedTimerDeque, TimerDequeueConsumer};
473
474 #[test]
475 fn test_0()
476 {
477 #[derive(Debug, PartialEq, Eq, Clone)]
478 struct TestItem(u64);
479
480 impl fmt::Display for TestItem
481 {
482 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
483 {
484 write!(f, "0 = {}", self.0)
485 }
486 }
487
488 let mut time_list =
489 OrderedTimerDeque
490 ::<TimerDequeueConsumer<TestItem, OrderdTimerDequeOnce>>
491 ::new("test_label".into(), 4, false).unwrap();
492
493 let s = Instant::now();
494
495 let ts = common::get_current_timestamp();
496
497 let tss_set =
498 AbsoluteTime
499 ::new_time(ts.timestamp()+2, ts.timestamp_subsec_nanos() as i64)
500 .unwrap();
501
502 let ent1 = TestItem(1);
503 time_list.add_to_timer(ent1, tss_set).unwrap();
504
505
506 for _ in 0..3
507 {
508 let event = time_list.wait_for_event().unwrap();
509 if let TimerReadRes::WouldBlock = event
510 {
511 std::thread::sleep(Duration::from_millis(1001));
512
513 continue;
514 }
515
516 assert_eq!(event, TimerReadRes::Ok(1));
517
518 let e = s.elapsed();
519 let ts = AbsoluteTime::now();
520
521 assert_eq!(ts.seconds_cmp(&tss_set) == Ordering::Equal, true);
522
523 println!("ev: {}, instant: {:?} ts: {}, timerset: {}", event, e, ts, tss_set);
524
525 let mut tm_items: Vec<TestItem> = Vec::with_capacity(1);
526
527 time_list.timeout_event_handler(event, &mut tm_items).unwrap();
528
529 assert_eq!(tm_items.len(), 1);
530 assert_eq!(tm_items.first().is_some(), true);
531 assert_eq!(tm_items.first().unwrap(), &TestItem(1));
532
533 return;
534 }
535
536 panic!("timeout befor timer!");
537 }
538}