timer_deque_rs/deque_timeout/
timer_tickets.rs1use std::{fmt, sync::{Arc, Weak}};
16
17use crate::
18{
19 common::{self, TimerDequeueId}, deque_timeout::{OrderdTimerDequeOnce, OrderdTimerDequePeriodic, OrderedTimerDequeMode}, error::{TimerError, TimerErrorType, TimerResult}, timer_err, timer_portable::timer::{AbsoluteTime, RelativeTime, TimerExpMode}, TimerReadRes
20};
21
22use super::{OrderedTimerDeque, OrderedTimerDequeIntrf};
23
24
25impl PartialEq<TimerDequeueTicket> for TimerDequeueId
26{
27 fn eq(&self, other: &TimerDequeueTicket) -> bool
28 {
29 return self == other.status.as_ref();
30 }
31}
32
33#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
38pub struct TimerDequeueTicket
39{
40 status: Arc<TimerDequeueId>,
44}
45
46impl fmt::Display for TimerDequeueTicket
47{
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
49 {
50 write!(f, "{}", self.status)
51 }
52}
53
54impl PartialEq<TimerDequeueId> for TimerDequeueTicket
55{
56 fn eq(&self, other: &TimerDequeueId) -> bool
57 {
58 return self.status.as_ref() == other;
59 }
60}
61
62impl<MODE: OrderedTimerDequeMode> PartialEq<TimerDequeueTicketIssuer<MODE>> for TimerDequeueTicket
63{
64 fn eq(&self, other: &TimerDequeueTicketIssuer<MODE>) -> bool
65 {
66 let Some(up) = other.weak_status.upgrade()
67 else { return false };
68
69 return self.status.as_ref() == up.as_ref();
70 }
71}
72
73impl AsRef<TimerDequeueId> for TimerDequeueTicket
74{
75 fn as_ref(&self) -> &TimerDequeueId
76 {
77 return &self.status;
78 }
79}
80
81impl TimerDequeueTicket
82{
83 fn new() -> Self
84 {
85 return Self{ status: Arc::new(TimerDequeueId::new() ) }
86 }
87
88 fn pair(&self) -> Weak<TimerDequeueId>
89 {
90 return Arc::downgrade(&self.status);
91 }
92
93 pub
94 fn get_deque_id(&self) -> &TimerDequeueId
95 {
96 return self.status.as_ref();
97 }
98
99 pub
100 fn is_queued(&self) -> bool
101 {
102 return Arc::weak_count(&self.status) > 1;
103 }
104}
105
106#[derive(Debug)]
137pub struct TimerDequeueTicketIssuer<MODE: OrderedTimerDequeMode>
138{
139 weak_status: Weak<TimerDequeueId>,
141
142 timeout_mode: MODE,
144}
145
146impl<MODE: OrderedTimerDequeMode> fmt::Display for TimerDequeueTicketIssuer<MODE>
147{
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error>
149 {
150 write!(f, "{} until: {}", self.weak_status.upgrade().map_or("dropped".into(), |f| f.to_string()),
151 self.timeout_mode)
152 }
153}
154
155impl<MODE: OrderedTimerDequeMode> Eq for TimerDequeueTicketIssuer<MODE> {}
156
157impl<MODE: OrderedTimerDequeMode> PartialEq for TimerDequeueTicketIssuer<MODE>
158{
159 fn eq(&self, other: &Self) -> bool
160 {
161 let s = self.weak_status.upgrade();
162 let o = other.weak_status.upgrade();
163
164 return s == o;
165 }
166}
167
168impl<MODE: OrderedTimerDequeMode> PartialEq<TimerDequeueTicket> for TimerDequeueTicketIssuer<MODE>
169{
170 fn eq(&self, other: &TimerDequeueTicket) -> bool
171 {
172 let Some(s) = self.weak_status.upgrade()
173 else { return false };
174
175 return s.as_ref() == other.status.as_ref();
176 }
177}
178
179impl<MODE: OrderedTimerDequeMode> Ord for TimerDequeueTicketIssuer<MODE>
180{
181 fn cmp(&self, other: &Self) -> std::cmp::Ordering
182 {
183 return self.timeout_mode.cmp(&other.timeout_mode);
184 }
185}
186
187impl<MODE: OrderedTimerDequeMode> PartialOrd for TimerDequeueTicketIssuer<MODE>
188{
189 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
190 {
191 return Some(self.cmp(other));
192 }
193}
194
195
196impl TimerDequeueTicketIssuer<OrderdTimerDequePeriodic>
197{
198 fn new(rel_time: RelativeTime) -> TimerResult<(Self, TimerDequeueTicket)>
200 {
201 let ext_ticket =
202 TimerDequeueTicket::new();
203
204 let int_ticket =
205 Self
206 {
207 weak_status:
208 ext_ticket.pair(),
209 timeout_mode:
210 OrderdTimerDequePeriodic::new(rel_time)
211 };
212
213
214 return Ok((int_ticket, ext_ticket));
215 }
216}
217
218impl TimerDequeueTicketIssuer<OrderdTimerDequeOnce>
219{
220 fn new(abs_time: AbsoluteTime) -> TimerResult<(Self, TimerDequeueTicket)>
222 {
223
224 let cur = AbsoluteTime::now();
225
226 if cur > abs_time
227 {
228 timer_err!(TimerErrorType::Expired, "time already expired");
229 }
230
231 let ext_ticket =
232 TimerDequeueTicket::new();
233
234 let int_ticket =
235 Self
236 {
237 weak_status:
238 ext_ticket.pair(),
239 timeout_mode:
240 OrderdTimerDequeOnce::new(abs_time)
241 };
242
243
244 return Ok((int_ticket, ext_ticket));
245 }
246}
247
248impl<MODE: OrderedTimerDequeMode> TimerDequeueTicketIssuer<MODE>
249{
250 fn is_valid(&self) -> bool
252 {
253 return self.weak_status.upgrade().is_some();
254 }
255
256 fn into_inner(&self) -> Option<TimerDequeueId>
259 {
260 return self.weak_status.upgrade().map(|f| *f.as_ref());
261 }
262}
263
264
265impl<MODE: OrderedTimerDequeMode> OrderedTimerDequeIntrf for TimerDequeueTicketIssuer<MODE>
266{
267 type Target = common::NoTarget;
269
270 type Ticket = TimerDequeueTicket;
272
273 #[inline]
274 fn get_timeout_absolute(&self) -> AbsoluteTime
275 {
276 return self.timeout_mode.get_absolut_timeout();
277 }
278}
279
280impl OrderedTimerDeque<TimerDequeueTicketIssuer<OrderdTimerDequeOnce>>
281{
282 #[inline]
300 pub
301 fn add_to_timer(&mut self, abs_time: AbsoluteTime) -> TimerResult<TimerDequeueTicket>
302 {
303 let (inst, ticket) =
304 TimerDequeueTicketIssuer::<OrderdTimerDequeOnce>::new(abs_time)?;
305
306 return self.add_to_timer_local(inst).map(|_| ticket);
307 }
308
309 pub
324 fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<TimerDequeueId>) -> TimerResult<()>
325 {
326 if let TimerReadRes::WouldBlock = res
328 {
329 return Ok(());
330 }
331
332 let cur_timestamp = AbsoluteTime::now();
333
334 loop
335 {
336 let Some(front_entity) =
338 self.deque_timeout_list.front()
339 else
340 {
341 break;
342 };
343
344 let time_until = front_entity.get_timeout_absolute();
345
346 if time_until <= cur_timestamp
347 {
348 if let Some(item) = self.deque_timeout_list.pop_front().unwrap().into_inner()
349 {
350 timeout_items.push(item);
351 }
352
353 }
355 else
356 {
357 break;
358 }
359 }
360
361 self.reschedule_timer()?;
363
364 return Ok(());
365 }
366}
367
368impl OrderedTimerDeque<TimerDequeueTicketIssuer<OrderdTimerDequePeriodic>>
369{
370 #[inline]
388 pub
389 fn add_to_timer(&mut self, rel_time: RelativeTime) -> TimerResult<TimerDequeueTicket>
390 {
391 let (inst, ticket) =
392 TimerDequeueTicketIssuer::<OrderdTimerDequePeriodic>::new(rel_time)?;
393
394 return self.add_to_timer_local(inst).map(|_| ticket);
395 }
396
397 pub
412 fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<TimerDequeueId>) -> TimerResult<()>
413 {
414 if let TimerReadRes::WouldBlock = res
416 {
417 return Ok(());
418 }
419
420 let cur_timestamp = AbsoluteTime::now();
421
422 loop
423 {
424 let Some(front_entity) =
426 self.deque_timeout_list.front()
427 else
428 {
429 break;
430 };
431
432 let time_until = front_entity.get_timeout_absolute();
433
434 if time_until <= cur_timestamp
435 {
436 let mut deq = self.deque_timeout_list.pop_front().unwrap();
437
438 if let Some(item) = deq.into_inner()
439 {
440 timeout_items.push(item);
442
443 deq.timeout_mode.advance_timeout();
445
446 self.add_to_timer_local(deq)?;
448 }
449
450 }
452 else
453 {
454 break;
455 }
456 }
457
458 self.reschedule_timer()?;
460
461 return Ok(());
462 }
463
464}
465
466impl<MODE: OrderedTimerDequeMode> OrderedTimerDeque<TimerDequeueTicketIssuer<MODE>>
467{
468 pub
482 fn remove_from_sched_queue(&mut self, ticket: TimerDequeueTicket) -> TimerResult<()>
483 {
484 if self.deque_timeout_list.len() == 0
485 {
486 timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
487 }
488 else
489 {
490 if self.deque_timeout_list.len() == 1
492 {
493 let _ = self.deque_timeout_list.pop_front();
495
496 self.stop_timer()?;
498
499 return Ok(());
500 }
501 else
502 {
503 for (pos, q_item)
507 in self.deque_timeout_list.iter().enumerate()
508 {
509 if q_item == &ticket
510 {
511 let _ =
513 self.deque_timeout_list.remove(pos).unwrap().into_inner();
514
515 if pos == 0
517 {
518 self.reschedule_timer()?;
519 }
520
521 return Ok(());
522 }
523 }
524
525 return Ok(());
526 }
527 }
528 }
529}
530
531#[cfg(test)]
532mod tests
533{
534 use std::{cmp::Ordering, time::{Duration, Instant}};
535
536 use crate::{common, deque_timeout::OrderdTimerDequeOnce, timer_portable::timer::{AbsoluteTime, TimerReadRes}, OrderedTimerDeque, TimerDequeueId, TimerDequeueTicketIssuer};
537
538 #[test]
539 fn test_ticket_0()
540 {
541 let mut time_list =
542 OrderedTimerDeque
543 ::<TimerDequeueTicketIssuer<OrderdTimerDequeOnce>>
544 ::new("test_label".into(), 4, false).unwrap();
545
546
547 let s = Instant::now();
548
549 let ts = common::get_current_timestamp();
550
551 let tss_set = AbsoluteTime::new_time(ts.timestamp()+2, ts.timestamp_subsec_nanos() as i64);
552
553 let ticket = time_list.add_to_timer(tss_set).unwrap();
554
555 println!("ticket issued: {}", ticket);
556
557 for _ in 0..3
558 {
559 let event = time_list.wait_for_event().unwrap();
560 if let TimerReadRes::WouldBlock = event
561 {
562 std::thread::sleep(Duration::from_millis(1001));
563
564 continue;
565 }
566
567 assert_eq!(event, TimerReadRes::Ok(1));
568
569 let e = s.elapsed();
570 let ts = AbsoluteTime::now();
571
572 assert_eq!(ts.seconds_cmp(&tss_set) == Ordering::Equal, true);
573 println!("ev: {}, instant: {:?} ts: {}, timerset: {}", event, e, ts, tss_set);
574
575 let mut tm_items: Vec<TimerDequeueId> = Vec::with_capacity(1);
576
577 time_list.timeout_event_handler(event, &mut tm_items).unwrap();
578
579 println!("tickets: {}", tm_items[0]);
580
581 assert_eq!(tm_items.len(), 1);
582 assert_eq!(tm_items.first().is_some(), true);
583 assert_eq!(tm_items.first().unwrap(), &ticket);
584
585 return;
586 }
587
588 panic!("timeout befor timer!");
589 }
590}