1use std::{fmt, sync::{Arc, Weak}};
16
17use rand::RngCore;
18
19use crate::
20{
21 common,
22 error::{TimerError, TimerErrorType, TimerResult},
23 timer::{OrderedTimerDeque, OrderedTimerDequeIntrf},
24 timer_err,
25 timer_portable::timer::TimerExpMode, TimerReadRes
26};
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
31pub struct TimerDequeueId
32{
33 timestamp: i64,
35
36 timestamp_ns: u32,
38
39 ran_id: u32,
41}
42
43impl fmt::Display for TimerDequeueId
44{
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
46 {
47 write!(f, "{:X}/{:X}/{:X}", self.timestamp, self.timestamp_ns, self.ran_id)
48 }
49}
50
51impl PartialEq<TimerDequeueTicket> for TimerDequeueId
52{
53 fn eq(&self, other: &TimerDequeueTicket) -> bool
54 {
55 return self == other.status.as_ref();
56 }
57}
58
59impl AsRef<TimerDequeueId> for TimerDequeueId
60{
61 fn as_ref(&self) -> &TimerDequeueId
62 {
63 return self;
64 }
65}
66
67impl TimerDequeueId
68{
69 fn new() -> Self
70 {
71 let mut rng = rand::rng();
72
73 let ts = common::get_current_timestamp();
74
75 return
76 Self
77 {
78 timestamp: ts.timestamp(),
79 timestamp_ns: ts.timestamp_subsec_nanos(),
80 ran_id: rng.next_u32(),
81 };
82 }
83}
84
85#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
86pub struct TimerDequeueTicket
87{
88 status: Arc<TimerDequeueId>,
89}
90
91impl fmt::Display for TimerDequeueTicket
92{
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
94 {
95 write!(f, "{}", self.status)
96 }
97}
98
99impl PartialEq<TimerDequeueId> for TimerDequeueTicket
100{
101 fn eq(&self, other: &TimerDequeueId) -> bool
102 {
103 return self.status.as_ref() == other;
104 }
105}
106
107impl PartialEq<TimerDequeueTicketIssuer> for TimerDequeueTicket
108{
109 fn eq(&self, other: &TimerDequeueTicketIssuer) -> bool
110 {
111 let Some(up) = other.weak_status.upgrade()
112 else { return false };
113
114 return self.status.as_ref() == up.as_ref();
115 }
116}
117
118impl AsRef<TimerDequeueId> for TimerDequeueTicket
119{
120 fn as_ref(&self) -> &TimerDequeueId
121 {
122 return &self.status;
123 }
124}
125
126impl TimerDequeueTicket
127{
128 fn new() -> Self
129 {
130 return Self{ status: Arc::new(TimerDequeueId::new() ) }
131 }
132
133 fn pair(&self) -> Weak<TimerDequeueId>
134 {
135 return Arc::downgrade(&self.status);
136 }
137
138 pub
139 fn get_deque_id(&self) -> &TimerDequeueId
140 {
141 return self.status.as_ref();
142 }
143
144 pub
145 fn is_queued(&self) -> bool
146 {
147 return Arc::weak_count(&self.status) > 1;
148 }
149}
150
151#[derive(Debug)]
165pub struct TimerDequeueTicketIssuer
166{
167 weak_status: Weak<TimerDequeueId>,
169
170 timeout_absolute: TimerExpMode,
172}
173
174impl fmt::Display for TimerDequeueTicketIssuer
175{
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error>
177 {
178 write!(f, "{} until: {}", self.weak_status.upgrade().map_or("dropped".into(), |f| f.to_string()),
179 self.timeout_absolute)
180 }
181}
182
183impl Eq for TimerDequeueTicketIssuer {}
184
185impl PartialEq for TimerDequeueTicketIssuer
186{
187 fn eq(&self, other: &Self) -> bool
188 {
189 let s = self.weak_status.upgrade();
190 let o = other.weak_status.upgrade();
191
192 return s == o;
193 }
194}
195
196impl PartialEq<TimerDequeueTicket> for TimerDequeueTicketIssuer
197{
198 fn eq(&self, other: &TimerDequeueTicket) -> bool
199 {
200 let Some(s) = self.weak_status.upgrade()
201 else { return false };
202
203 return s.as_ref() == other.status.as_ref();
204 }
205}
206
207impl Ord for TimerDequeueTicketIssuer
208{
209 fn cmp(&self, other: &Self) -> std::cmp::Ordering
210 {
211 return self.timeout_absolute.cmp(&other.timeout_absolute);
212 }
213}
214
215impl PartialOrd for TimerDequeueTicketIssuer
216{
217 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
218 {
219 return Some(self.cmp(other));
220 }
221}
222
223
224impl TimerDequeueTicketIssuer
225{
226 fn new(abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, TimerDequeueTicket)>
228 {
229 let abs_time_nsec = abs_time_nsec & 999_999_999;
230
231 let cur = common::get_current_timestamp();
232 let req = TimerExpMode::OneShot { sec: abs_time_sec, nsec: abs_time_nsec };
233
234 if TimerExpMode::from(cur) > req
235 {
236 timer_err!(TimerErrorType::Expired, "time already expired");
237 }
238
239
240 let ext_ticket =
241 TimerDequeueTicket::new();
242
243 let int_ticket =
244 Self
245 {
246 weak_status: ext_ticket.pair(),
247 timeout_absolute: req
248 };
249
250
251 return Ok((int_ticket, ext_ticket));
252 }
253
254 fn get_time_until(&self) -> TimerExpMode
256 {
257 return self.timeout_absolute;
258 }
259
260 fn is_valid(&self) -> bool
262 {
263 return self.weak_status.upgrade().is_some();
264 }
265
266 fn into_inner(&self) -> Option<TimerDequeueId>
269 {
270 return self.weak_status.upgrade().map(|f| *f.as_ref());
271 }
272}
273
274
275impl OrderedTimerDequeIntrf for TimerDequeueTicketIssuer
276{
277 type Target = common::NoTarget;
279
280 type Ticket = TimerDequeueTicket;
282
283 #[inline]
284 fn wrap(_target: Self::Target, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, Self::Ticket)>
285 {
286 return Self::new(abs_time_sec, abs_time_nsec);
287 }
288
289 #[inline]
290 fn get_timeout_absolute(&self) -> TimerExpMode
291 {
292 return self.timeout_absolute;
293 }
294}
295
296impl OrderedTimerDeque<TimerDequeueTicketIssuer>
297{
298 #[inline]
314 pub
315 fn add_to_timer(&mut self, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<TimerDequeueTicket>
316 {
317 return self.add_to_timer_local(common::NoTarget, abs_time_sec, abs_time_nsec);
318 }
319
320 pub
334 fn remove_from_sched_queue(&mut self, ticket: TimerDequeueTicket) -> TimerResult<()>
335 {
336 if self.deque_timeout_list.len() == 0
337 {
338 timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
339 }
340 else
341 {
342 if self.deque_timeout_list.len() == 1
344 {
345 let _ = self.deque_timeout_list.pop_front();
347
348 self.stop_timer()?;
350
351 return Ok(());
352 }
353 else
354 {
355 for (pos, q_item)
359 in self.deque_timeout_list.iter().enumerate()
360 {
361 if q_item == &ticket
362 {
363 let _ =
365 self.deque_timeout_list.remove(pos).unwrap().into_inner();
366
367 if pos == 0
369 {
370 self.reschedule_timer()?;
371 }
372
373 return Ok(());
374 }
375 }
376
377 return Ok(());
378 }
379 }
380 }
381
382 pub
397 fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<TimerDequeueId>) -> TimerResult<()>
398 {
399 if let TimerReadRes::WouldBlock = res
401 {
402 return Ok(());
403 }
404
405 let force_timer_resched =
407 TimerReadRes::Cancelled == res || TimerReadRes::Ok(0) == res;
408
409
410 let mut to_remove: Vec<TimerDequeueId> = Vec::new();
411 let mut error: Option<TimerError> = None;
412 let mut last_time_until: TimerExpMode = TimerExpMode::None;
413 let cur_timestamp = common::get_current_timestamp();
414
415 loop
416 {
417 let Some(front_entity) =
419 self.deque_timeout_list.front()
420 else
421 {
422 let Err(e) = self.stop_timer()
424 else
425 {
426 break;
427 };
428
429 error = Some(e);
430
431 break;
432 };
433
434 let time_until = front_entity.get_time_until();
435
436 if time_until <= TimerExpMode::from(cur_timestamp)
437 {
438 if let Some(item) = self.deque_timeout_list.pop_front().unwrap().into_inner()
439 {
440 to_remove.push(item);
441 }
442
443 }
445 else
446 {
447 if last_time_until != TimerExpMode::None || force_timer_resched == true
449 {
450 self.reschedule_timer()?;
452 }
453
454 break;
455 }
456
457 last_time_until = time_until;
458 }
459
460 *timeout_items = to_remove;
461
462 return error.map_or(Ok(()), |f| Err(f));
463 }
464
465}
466
467#[cfg(test)]
468mod tests
469{
470 use std::time::{Duration, Instant};
471
472 use crate::{common, timer::OrderedTimerDeque, timer_portable::timer::TimerReadRes, timer_tickets::{TimerDequeueId, TimerDequeueTicketIssuer}};
473
474 #[test]
475 fn test_ticket_0()
476 {
477 let mut time_list =
478 OrderedTimerDeque
479 ::<TimerDequeueTicketIssuer>
480 ::new("test_label".into(), 4, false).unwrap();
481
482
483 let s = Instant::now();
484
485 let tss_set = common::get_current_timestamp().timestamp()+2;
486
487 let ticket = time_list.add_to_timer(tss_set, 0).unwrap();
488
489 println!("ticket issued: {}", ticket);
490
491 for _ in 0..3
492 {
493 let event = time_list.wait_for_event().unwrap();
494 if let TimerReadRes::WouldBlock = event
495 {
496 std::thread::sleep(Duration::from_millis(1001));
497
498 continue;
499 }
500
501 assert_eq!(event, TimerReadRes::Ok(1));
502
503 let e = s.elapsed();
504 let ts = common::get_current_timestamp();
505
506 assert_eq!(ts.timestamp(), tss_set);
507 println!("instant: {:?} ts: {}, timerset: {}", e, ts.timestamp(), tss_set);
508
509 let mut tm_items: Vec<TimerDequeueId> = Vec::with_capacity(1);
510
511 time_list.timeout_event_handler(event, &mut tm_items).unwrap();
512
513 println!("tickets: {}", tm_items[0]);
514
515 assert_eq!(tm_items.len(), 1);
516 assert_eq!(tm_items.first().is_some(), true);
517 assert_eq!(tm_items.first().unwrap(), &ticket);
518
519 return;
520 }
521
522 panic!("timeout befor timer!");
523 }
524}