timer_deque_rs/deque_timeout/
timer_tickets.rs1use std::{fmt, sync::{Arc, Weak}};
17
18use crate::
19{
20 common::{self, TimerDequeueId},
21 deque_timeout::{OrderdTimerDequeOnce, OrderdTimerDequePeriodic, OrderedTimerDequeMode},
22 error::{TimerErrorType, TimerResult},
23 timer_err,
24 timer_portable::timer::{AbsoluteTime, RelativeTime},
25 TimerReadRes
26};
27
28use super::{OrderedTimerDeque, OrderedTimerDequeIntrf};
29
30
31impl PartialEq<TimerDequeueTicket> for TimerDequeueId
32{
33 fn eq(&self, other: &TimerDequeueTicket) -> bool
34 {
35 return self == other.status.as_ref();
36 }
37}
38
39#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
44pub struct TimerDequeueTicket
45{
46 status: Arc<TimerDequeueId>,
50}
51
52impl fmt::Display for TimerDequeueTicket
53{
54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
55 {
56 write!(f, "{}", self.status)
57 }
58}
59
60impl PartialEq<TimerDequeueId> for TimerDequeueTicket
61{
62 fn eq(&self, other: &TimerDequeueId) -> bool
63 {
64 return self.status.as_ref() == other;
65 }
66}
67
68impl<MODE: OrderedTimerDequeMode> PartialEq<TimerDequeueTicketIssuer<MODE>> for TimerDequeueTicket
69{
70 fn eq(&self, other: &TimerDequeueTicketIssuer<MODE>) -> bool
71 {
72 let Some(up) = other.weak_status.upgrade()
73 else { return false };
74
75 return self.status.as_ref() == up.as_ref();
76 }
77}
78
79impl AsRef<TimerDequeueId> for TimerDequeueTicket
80{
81 fn as_ref(&self) -> &TimerDequeueId
82 {
83 return &self.status;
84 }
85}
86
87impl TimerDequeueTicket
88{
89 fn new() -> Self
90 {
91 return Self{ status: Arc::new(TimerDequeueId::new() ) }
92 }
93
94 fn pair(&self) -> Weak<TimerDequeueId>
95 {
96 return Arc::downgrade(&self.status);
97 }
98
99 pub
100 fn get_deque_id(&self) -> &TimerDequeueId
101 {
102 return self.status.as_ref();
103 }
104
105 pub
106 fn is_queued(&self) -> bool
107 {
108 return Arc::weak_count(&self.status) > 1;
109 }
110}
111
112#[derive(Debug)]
143pub struct TimerDequeueTicketIssuer<MODE: OrderedTimerDequeMode>
144{
145 weak_status: Weak<TimerDequeueId>,
147
148 timeout_mode: MODE,
150}
151
152impl<MODE: OrderedTimerDequeMode> fmt::Display for TimerDequeueTicketIssuer<MODE>
153{
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error>
155 {
156 write!(f, "{} until: {}", self.weak_status.upgrade().map_or("dropped".into(), |f| f.to_string()),
157 self.timeout_mode)
158 }
159}
160
161impl<MODE: OrderedTimerDequeMode> Eq for TimerDequeueTicketIssuer<MODE> {}
162
163impl<MODE: OrderedTimerDequeMode> PartialEq for TimerDequeueTicketIssuer<MODE>
164{
165 fn eq(&self, other: &Self) -> bool
166 {
167 let s = self.weak_status.upgrade();
168 let o = other.weak_status.upgrade();
169
170 return s == o;
171 }
172}
173
174impl<MODE: OrderedTimerDequeMode> PartialEq<TimerDequeueTicket> for TimerDequeueTicketIssuer<MODE>
175{
176 fn eq(&self, other: &TimerDequeueTicket) -> bool
177 {
178 let Some(s) = self.weak_status.upgrade()
179 else { return false };
180
181 return s.as_ref() == other.status.as_ref();
182 }
183}
184
185impl<MODE: OrderedTimerDequeMode> Ord for TimerDequeueTicketIssuer<MODE>
186{
187 fn cmp(&self, other: &Self) -> std::cmp::Ordering
188 {
189 return self.timeout_mode.cmp(&other.timeout_mode);
190 }
191}
192
193impl<MODE: OrderedTimerDequeMode> PartialOrd for TimerDequeueTicketIssuer<MODE>
194{
195 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
196 {
197 return Some(self.cmp(other));
198 }
199}
200
201
202impl TimerDequeueTicketIssuer<OrderdTimerDequePeriodic>
203{
204 fn new(rel_time: RelativeTime) -> TimerResult<(Self, TimerDequeueTicket)>
206 {
207 let ext_ticket =
208 TimerDequeueTicket::new();
209
210 let int_ticket =
211 Self
212 {
213 weak_status:
214 ext_ticket.pair(),
215 timeout_mode:
216 OrderdTimerDequePeriodic::new(rel_time)
217 };
218
219
220 return Ok((int_ticket, ext_ticket));
221 }
222}
223
224impl TimerDequeueTicketIssuer<OrderdTimerDequeOnce>
225{
226 fn new(abs_time: AbsoluteTime) -> TimerResult<(Self, TimerDequeueTicket)>
228 {
229
230 let cur = AbsoluteTime::now();
231
232 if cur > abs_time
233 {
234 timer_err!(TimerErrorType::Expired, "time already expired");
235 }
236
237 let ext_ticket =
238 TimerDequeueTicket::new();
239
240 let int_ticket =
241 Self
242 {
243 weak_status:
244 ext_ticket.pair(),
245 timeout_mode:
246 OrderdTimerDequeOnce::new(abs_time)
247 };
248
249
250 return Ok((int_ticket, ext_ticket));
251 }
252}
253
254impl<MODE: OrderedTimerDequeMode> TimerDequeueTicketIssuer<MODE>
255{
256 fn is_valid(&self) -> bool
258 {
259 return self.weak_status.upgrade().is_some();
260 }
261
262 fn into_inner(&self) -> Option<TimerDequeueId>
265 {
266 return self.weak_status.upgrade().map(|f| *f.as_ref());
267 }
268}
269
270
271impl<MODE: OrderedTimerDequeMode> OrderedTimerDequeIntrf for TimerDequeueTicketIssuer<MODE>
272{
273 type Target = common::NoTarget;
275
276 type Ticket = TimerDequeueTicket;
278
279 #[inline]
280 fn get_timeout_absolute(&self) -> AbsoluteTime
281 {
282 return self.timeout_mode.get_absolut_timeout();
283 }
284}
285
286impl OrderedTimerDeque<TimerDequeueTicketIssuer<OrderdTimerDequeOnce>>
287{
288 #[inline]
306 pub
307 fn add_to_timer(&mut self, abs_time: AbsoluteTime) -> TimerResult<TimerDequeueTicket>
308 {
309 let (inst, ticket) =
310 TimerDequeueTicketIssuer::<OrderdTimerDequeOnce>::new(abs_time)?;
311
312 return self.add_to_timer_local(inst).map(|_| ticket);
313 }
314
315 pub
330 fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<TimerDequeueId>) -> TimerResult<()>
331 {
332 if let TimerReadRes::WouldBlock = res
334 {
335 return Ok(());
336 }
337
338 let cur_timestamp = AbsoluteTime::now();
339
340 loop
341 {
342 let Some(front_entity) =
344 self.deque_timeout_list.front()
345 else
346 {
347 break;
348 };
349
350 let time_until = front_entity.get_timeout_absolute();
351
352 if time_until <= cur_timestamp
353 {
354 if let Some(item) = self.deque_timeout_list.pop_front().unwrap().into_inner()
355 {
356 timeout_items.push(item);
357 }
358
359 }
361 else
362 {
363 break;
364 }
365 }
366
367 self.reschedule_timer()?;
369
370 return Ok(());
371 }
372}
373
374impl OrderedTimerDeque<TimerDequeueTicketIssuer<OrderdTimerDequePeriodic>>
375{
376 #[inline]
394 pub
395 fn add_to_timer(&mut self, rel_time: RelativeTime) -> TimerResult<TimerDequeueTicket>
396 {
397 let (inst, ticket) =
398 TimerDequeueTicketIssuer::<OrderdTimerDequePeriodic>::new(rel_time)?;
399
400 return self.add_to_timer_local(inst).map(|_| ticket);
401 }
402
403 pub
418 fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<TimerDequeueId>) -> TimerResult<()>
419 {
420 if let TimerReadRes::WouldBlock = res
422 {
423 return Ok(());
424 }
425
426 let cur_timestamp = AbsoluteTime::now();
427
428 loop
429 {
430 let Some(front_entity) =
432 self.deque_timeout_list.front()
433 else
434 {
435 break;
436 };
437
438 let time_until = front_entity.get_timeout_absolute();
439
440 if time_until <= cur_timestamp
441 {
442 let mut deq = self.deque_timeout_list.pop_front().unwrap();
443
444 if let Some(item) = deq.into_inner()
445 {
446 timeout_items.push(item);
448
449 deq.timeout_mode.advance_timeout();
451
452 self.add_to_timer_local(deq)?;
454 }
455
456 }
458 else
459 {
460 break;
461 }
462 }
463
464 self.reschedule_timer()?;
466
467 return Ok(());
468 }
469
470}
471
472impl<MODE: OrderedTimerDequeMode> OrderedTimerDeque<TimerDequeueTicketIssuer<MODE>>
473{
474 pub
488 fn remove_from_sched_queue(&mut self, ticket: TimerDequeueTicket) -> TimerResult<()>
489 {
490 if self.deque_timeout_list.len() == 0
491 {
492 timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
493 }
494 else
495 {
496 if self.deque_timeout_list.len() == 1
498 {
499 let _ = self.deque_timeout_list.pop_front();
501
502 self.stop_timer()?;
504
505 return Ok(());
506 }
507 else
508 {
509 for (pos, q_item)
513 in self.deque_timeout_list.iter().enumerate()
514 {
515 if q_item == &ticket
516 {
517 let _ =
519 self.deque_timeout_list.remove(pos).unwrap().into_inner();
520
521 if pos == 0
523 {
524 self.reschedule_timer()?;
525 }
526
527 return Ok(());
528 }
529 }
530
531 return Ok(());
532 }
533 }
534 }
535}
536
537#[cfg(test)]
538mod tests
539{
540 use std::{cmp::Ordering, time::{Duration, Instant}};
541
542 use crate::{common, deque_timeout::OrderdTimerDequeOnce, timer_portable::timer::{AbsoluteTime, TimerReadRes}, OrderedTimerDeque, TimerDequeueId, TimerDequeueTicketIssuer};
543
544 #[test]
545 fn test_ticket_0()
546 {
547 let mut time_list =
548 OrderedTimerDeque
549 ::<TimerDequeueTicketIssuer<OrderdTimerDequeOnce>>
550 ::new("test_label".into(), 4, false).unwrap();
551
552
553 let s = Instant::now();
554
555 let ts = common::get_current_timestamp();
556
557 let tss_set =
558 AbsoluteTime
559 ::new_time(ts.timestamp()+2, ts.timestamp_subsec_nanos() as i64)
560 .unwrap();
561
562 let ticket = time_list.add_to_timer(tss_set).unwrap();
563
564 println!("ticket issued: {}", ticket);
565
566 for _ in 0..3
567 {
568 let event = time_list.wait_for_event().unwrap();
569 if let TimerReadRes::WouldBlock = event
570 {
571 std::thread::sleep(Duration::from_millis(1001));
572
573 continue;
574 }
575
576 assert_eq!(event, TimerReadRes::Ok(1));
577
578 let e = s.elapsed();
579 let ts = AbsoluteTime::now();
580
581 assert_eq!(ts.seconds_cmp(&tss_set) == Ordering::Equal, true);
582 println!("ev: {}, instant: {:?} ts: {}, timerset: {}", event, e, ts, tss_set);
583
584 let mut tm_items: Vec<TimerDequeueId> = Vec::with_capacity(1);
585
586 time_list.timeout_event_handler(event, &mut tm_items).unwrap();
587
588 println!("tickets: {}", tm_items[0]);
589
590 assert_eq!(tm_items.len(), 1);
591 assert_eq!(tm_items.first().is_some(), true);
592 assert_eq!(tm_items.first().unwrap(), &ticket);
593
594 return;
595 }
596
597 panic!("timeout befor timer!");
598 }
599}