1use std::{fmt, sync::{Arc, Weak}};
16
17use rand::RngCore;
18
19use crate::
20{
21 common,
22 error::{TimerError, TimerErrorType, TimerResult},
23 timer::{OrderedTimerDeque, OrderedTimerDequeIntrf},
24 timer_err,
25 timer_portable::timer::TimerExpMode, TimerReadRes
26};
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
31pub struct TimerDequeueId
32{
33 timestamp: i64,
35
36 timestamp_ns: u32,
38
39 ran_id: u32,
41}
42
43impl fmt::Display for TimerDequeueId
44{
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
46 {
47 write!(f, "{:X}/{:X}/{:X}", self.timestamp, self.timestamp_ns, self.ran_id)
48 }
49}
50
51impl PartialEq<TimerDequeueTicket> for TimerDequeueId
52{
53 fn eq(&self, other: &TimerDequeueTicket) -> bool
54 {
55 return self == other.status.as_ref();
56 }
57}
58
59impl AsRef<TimerDequeueId> for TimerDequeueId
60{
61 fn as_ref(&self) -> &TimerDequeueId
62 {
63 return self;
64 }
65}
66
67impl TimerDequeueId
68{
69 fn new() -> Self
70 {
71 let mut rng = rand::rng();
72
73 let ts = common::get_current_timestamp();
74
75 return
76 Self
77 {
78 timestamp: ts.timestamp(),
79 timestamp_ns: ts.timestamp_subsec_nanos(),
80 ran_id: rng.next_u32(),
81 };
82 }
83}
84
85#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
86pub struct TimerDequeueTicket
87{
88 status: Arc<TimerDequeueId>,
89}
90
91impl fmt::Display for TimerDequeueTicket
92{
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
94 {
95 write!(f, "{}", self.status)
96 }
97}
98
99impl PartialEq<TimerDequeueId> for TimerDequeueTicket
100{
101 fn eq(&self, other: &TimerDequeueId) -> bool
102 {
103 return self.status.as_ref() == other;
104 }
105}
106
107impl PartialEq<TimerDequeueTicketIssuer> for TimerDequeueTicket
108{
109 fn eq(&self, other: &TimerDequeueTicketIssuer) -> bool
110 {
111 let Some(up) = other.weak_status.upgrade()
112 else { return false };
113
114 return self.status.as_ref() == up.as_ref();
115 }
116}
117
118impl AsRef<TimerDequeueId> for TimerDequeueTicket
119{
120 fn as_ref(&self) -> &TimerDequeueId
121 {
122 return &self.status;
123 }
124}
125
126impl TimerDequeueTicket
127{
128 fn new() -> Self
129 {
130 return Self{ status: Arc::new(TimerDequeueId::new() ) }
131 }
132
133 fn pair(&self) -> Weak<TimerDequeueId>
134 {
135 return Arc::downgrade(&self.status);
136 }
137
138 pub
139 fn get_deque_id(&self) -> &TimerDequeueId
140 {
141 return self.status.as_ref();
142 }
143
144 pub
145 fn is_queued(&self) -> bool
146 {
147 return Arc::weak_count(&self.status) > 1;
148 }
149}
150
151#[derive(Debug)]
156pub struct TimerDequeueTicketIssuer
157{
158 weak_status: Weak<TimerDequeueId>,
160
161 timeout_absolute: TimerExpMode,
163}
164
165impl fmt::Display for TimerDequeueTicketIssuer
166{
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error>
168 {
169 write!(f, "{} until: {}", self.weak_status.upgrade().map_or("dropped".into(), |f| f.to_string()),
170 self.timeout_absolute)
171 }
172}
173
174impl Eq for TimerDequeueTicketIssuer {}
175
176impl PartialEq for TimerDequeueTicketIssuer
177{
178 fn eq(&self, other: &Self) -> bool
179 {
180 let s = self.weak_status.upgrade();
181 let o = other.weak_status.upgrade();
182
183 return s == o;
184 }
185}
186
187impl PartialEq<TimerDequeueTicket> for TimerDequeueTicketIssuer
188{
189 fn eq(&self, other: &TimerDequeueTicket) -> bool
190 {
191 let Some(s) = self.weak_status.upgrade()
192 else { return false };
193
194 return s.as_ref() == other.status.as_ref();
195 }
196}
197
198impl Ord for TimerDequeueTicketIssuer
199{
200 fn cmp(&self, other: &Self) -> std::cmp::Ordering
201 {
202 return self.timeout_absolute.cmp(&other.timeout_absolute);
203 }
204}
205
206impl PartialOrd for TimerDequeueTicketIssuer
207{
208 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
209 {
210 return Some(self.cmp(other));
211 }
212}
213
214
215impl TimerDequeueTicketIssuer
216{
217 fn new(abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, TimerDequeueTicket)>
219 {
220 let abs_time_nsec = abs_time_nsec & 999_999_999;
221
222 let cur = common::get_current_timestamp();
223 let req = TimerExpMode::OneShot { sec: abs_time_sec, nsec: abs_time_nsec };
224
225 if TimerExpMode::from(cur) > req
226 {
227 timer_err!(TimerErrorType::Expired, "time already expired");
228 }
229
230
231 let ext_ticket =
232 TimerDequeueTicket::new();
233
234 let int_ticket =
235 Self
236 {
237 weak_status: ext_ticket.pair(),
238 timeout_absolute: req
239 };
240
241
242 return Ok((int_ticket, ext_ticket));
243 }
244
245 fn get_time_until(&self) -> TimerExpMode
247 {
248 return self.timeout_absolute;
249 }
250
251 fn is_valid(&self) -> bool
253 {
254 return self.weak_status.upgrade().is_some();
255 }
256
257 fn into_inner(&self) -> Option<TimerDequeueId>
260 {
261 return self.weak_status.upgrade().map(|f| *f.as_ref());
262 }
263}
264
265
266impl OrderedTimerDequeIntrf for TimerDequeueTicketIssuer
267{
268 type Target = common::NoTarget;
270
271 type Ticket = TimerDequeueTicket;
273
274 #[inline]
275 fn wrap(_target: Self::Target, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, Self::Ticket)>
276 {
277 return Self::new(abs_time_sec, abs_time_nsec);
278 }
279
280 #[inline]
281 fn get_timeout_absolute(&self) -> TimerExpMode
282 {
283 return self.timeout_absolute;
284 }
285}
286
287impl OrderedTimerDeque<TimerDequeueTicketIssuer>
288{
289 #[inline]
305 pub
306 fn add_to_timer(&mut self, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<TimerDequeueTicket>
307 {
308 return self.add_to_timer_local(common::NoTarget, abs_time_sec, abs_time_nsec);
309 }
310
311 pub
325 fn remove_from_sched_queue(&mut self, ticket: TimerDequeueTicket) -> TimerResult<()>
326 {
327 if self.deque_timeout_list.len() == 0
328 {
329 timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
330 }
331 else
332 {
333 if self.deque_timeout_list.len() == 1
335 {
336 let _ = self.deque_timeout_list.pop_front();
338
339 self.stop_timer()?;
341
342 return Ok(());
343 }
344 else
345 {
346 for (pos, q_item)
350 in self.deque_timeout_list.iter().enumerate()
351 {
352 if q_item == &ticket
353 {
354 let _ =
356 self.deque_timeout_list.remove(pos).unwrap().into_inner();
357
358 if pos == 0
360 {
361 self.reschedule_timer()?;
362 }
363
364 return Ok(());
365 }
366 }
367
368 return Ok(());
369 }
370 }
371 }
372
373 pub
388 fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<TimerDequeueId>) -> TimerResult<()>
389 {
390 if let TimerReadRes::WouldBlock = res
392 {
393 return Ok(());
394 }
395
396 let force_timer_resched =
398 TimerReadRes::Cancelled == res || TimerReadRes::Ok(0) == res;
399
400
401 let mut to_remove: Vec<TimerDequeueId> = Vec::new();
402 let mut error: Option<TimerError> = None;
403 let mut last_time_until: TimerExpMode = TimerExpMode::None;
404 let cur_timestamp = common::get_current_timestamp();
405
406 loop
407 {
408 let Some(front_entity) =
410 self.deque_timeout_list.front()
411 else
412 {
413 let Err(e) = self.stop_timer()
415 else
416 {
417 break;
418 };
419
420 error = Some(e);
421
422 break;
423 };
424
425 let time_until = front_entity.get_time_until();
426
427 if time_until <= TimerExpMode::from(cur_timestamp)
428 {
429 if let Some(item) = self.deque_timeout_list.pop_front().unwrap().into_inner()
430 {
431 to_remove.push(item);
432 }
433
434 }
436 else
437 {
438 if last_time_until != TimerExpMode::None || force_timer_resched == true
440 {
441 self.reschedule_timer()?;
443 }
444
445 break;
446 }
447
448 last_time_until = time_until;
449 }
450
451 *timeout_items = to_remove;
452
453 return error.map_or(Ok(()), |f| Err(f));
454 }
455
456}
457
458#[cfg(test)]
459mod tests
460{
461 use std::time::{Duration, Instant};
462
463 use crate::{common, timer::OrderedTimerDeque, timer_portable::timer::TimerReadRes, timer_tickets::{TimerDequeueId, TimerDequeueTicketIssuer}};
464
465 #[test]
466 fn test_ticket_0()
467 {
468 let mut time_list =
469 OrderedTimerDeque
470 ::<TimerDequeueTicketIssuer>
471 ::new("test_label".into(), 4, false).unwrap();
472
473
474 let s = Instant::now();
475
476 let tss_set = common::get_current_timestamp().timestamp()+2;
477
478 let ticket = time_list.add_to_timer(tss_set, 0).unwrap();
479
480 println!("ticket issued: {}", ticket);
481
482 for _ in 0..3
483 {
484 let event = time_list.wait_for_event().unwrap();
485 if let TimerReadRes::WouldBlock = event
486 {
487 std::thread::sleep(Duration::from_millis(1001));
488
489 continue;
490 }
491
492 assert_eq!(event, TimerReadRes::Ok(1));
493
494 let e = s.elapsed();
495 let ts = common::get_current_timestamp();
496
497 assert_eq!(ts.timestamp(), tss_set);
498 println!("instant: {:?} ts: {}, timerset: {}", e, ts.timestamp(), tss_set);
499
500 let mut tm_items: Vec<TimerDequeueId> = Vec::with_capacity(1);
501
502 time_list.timeout_event_handler(event, &mut tm_items).unwrap();
503
504 println!("tickets: {}", tm_items[0]);
505
506 assert_eq!(tm_items.len(), 1);
507 assert_eq!(tm_items.first().is_some(), true);
508 assert_eq!(tm_items.first().unwrap(), &ticket);
509
510 return;
511 }
512
513 panic!("timeout befor timer!");
514 }
515}