timer_deque_rs/
timer_consumer.rs1use std::fmt;
16
17use crate::error::{TimerError, TimerErrorType, TimerResult};
18use crate::timer::{OrderedTimerDeque, OrderedTimerDequeIntrf};
19use crate::{common, timer_err, TimerReadRes};
20use crate::timer_portable::timer::TimerExpMode;
21
22
23#[derive(Debug)]
44pub struct TimerDequeueConsumer<R: PartialEq + Eq + fmt::Debug + fmt::Display + Send>
45{
46 target: R,
48
49 timeout_absolute: TimerExpMode,
51}
52
53impl<R> fmt::Display
54for TimerDequeueConsumer<R>
55where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send
56{
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error>
58 {
59 write!(f, "{} until: {}", self.target, self.timeout_absolute)
60 }
61}
62
63
64impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> TimerDequeueConsumer<R>
65{
66 fn new(target: R, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<Self>
67 {
68 let abs_time_nsec = abs_time_nsec & 999_999_999;
69
70 let cur = common::get_current_timestamp();
71 let req = TimerExpMode::OneShot { sec: abs_time_sec, nsec: abs_time_nsec };
72
73 if TimerExpMode::from(cur) > req
74 {
75 timer_err!(TimerErrorType::Expired, "time already expired");
76 }
77
78 return Ok(
79 Self
80 {
81 target:
82 target,
83 timeout_absolute:
84 req
85 }
86 );
87 }
88
89 fn get_time_until(&self) -> TimerExpMode
90 {
91 return self.timeout_absolute;
92 }
93
94 fn into_inner(self) -> R
95 {
96 return self.target;
97 }
98}
99
100impl<R: PartialEq + Eq + fmt::Debug + fmt::Display + Send> Eq for TimerDequeueConsumer<R> {}
101
102impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> PartialEq for TimerDequeueConsumer<R>
103{
104 fn eq(&self, other: &Self) -> bool
105 {
106 return self.target == other.target;
107 }
108}
109
110impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> PartialEq<R> for TimerDequeueConsumer<R>
111{
112 fn eq(&self, other: &R) -> bool
113 {
114 return &self.target == other;
115 }
116}
117
118impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> Ord for TimerDequeueConsumer<R>
119{
120 fn cmp(&self, other: &Self) -> std::cmp::Ordering
121 {
122 return self.timeout_absolute.cmp(&other.timeout_absolute);
123 }
124}
125
126impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> PartialOrd for TimerDequeueConsumer<R>
127{
128 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
129 {
130 return Some(self.cmp(other));
131 }
132}
133
134impl<R> OrderedTimerDequeIntrf for TimerDequeueConsumer<R>
135where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send
136{
137 type Target = R;
139
140 type Ticket = common::NoTicket;
142
143 #[inline]
144 fn wrap(target: Self::Target, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, Self::Ticket)>
145 {
146 return Ok((Self::new(target, abs_time_sec, abs_time_nsec)?, common::NoTicket));
147 }
148
149 #[inline]
150 fn get_timeout_absolute(&self) -> TimerExpMode
151 {
152 return self.timeout_absolute;
153 }
154}
155
156impl<R> OrderedTimerDeque<TimerDequeueConsumer<R>>
157where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send
158{
159 pub
179 fn add_to_timer(&mut self, entity: R, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<common::NoTicket>
180 {
181 return self.add_to_timer_local(entity, abs_time_sec, abs_time_nsec);
182 }
183
184 pub
201 fn remove_from_sched_queue(&mut self, entity: &R) -> TimerResult<Option<R>>
202 {
203 if self.deque_timeout_list.len() == 0
204 {
205 timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
206 }
207 else
208 {
209 if self.deque_timeout_list.len() == 1
211 {
212 let ret_ent = self.deque_timeout_list.pop_front().unwrap();
214
215 self.stop_timer()?;
217
218 return Ok(Some(ret_ent.into_inner()));
219 }
220 else
221 {
222 for (pos, q_item)
226 in self.deque_timeout_list.iter().enumerate()
227 {
228 if q_item == entity
229 {
230 let ret_ent =
232 self.deque_timeout_list.remove(pos).unwrap().into_inner();
233
234 if pos == 0
236 {
237 self.reschedule_timer()?;
238 }
239
240 return Ok(Some(ret_ent));
241 }
242 }
243
244 return Ok(None);
245 }
246 }
247 }
248
249 pub
264 fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<R>) -> TimerResult<()>
265 {
266 if let TimerReadRes::WouldBlock = res
268 {
269 return Ok(());
270 }
271
272 let force_timer_resched =
274 TimerReadRes::Cancelled == res || TimerReadRes::Ok(0) == res;
275
276 let mut timeouts: Vec<R> = Vec::new();
277 let mut error: Option<TimerError> = None;
278
279 let cur_timestamp = common::get_current_timestamp();
280
281 loop
282 {
283 let Some(front_entity) =
285 self.deque_timeout_list.front()
286 else
287 {
288 let Err(e) = self.stop_timer()
289 else
290 {
291 break;
292 };
293
294 error = Some(e);
295
296 break;
297 };
298
299 let time_until = front_entity.get_time_until();
300
301 if time_until <= TimerExpMode::from(cur_timestamp)
302 {
303 timeouts.push(self.deque_timeout_list.pop_front().unwrap().into_inner());
304 }
305 else
306 {
307 if timeouts.is_empty() == false || force_timer_resched == true
308 {
309 self.reschedule_timer()?;
311 }
312
313 break;
314 }
315 }
316
317 *timeout_items = timeouts;
318
319 return error.map_or(Ok(()), |f| Err(f));
320 }
321
322}
323
324#[cfg(test)]
325mod tests
326{
327 use std::{fmt, time::{Duration, Instant}};
328
329 use crate::{common, timer::OrderedTimerDeque, timer_consumer::TimerDequeueConsumer, timer_portable::timer::TimerReadRes};
330
331 #[test]
332 fn test_0()
333 {
334 #[derive(Debug, PartialEq, Eq, Clone)]
335 struct TestItem(u64);
336
337 impl fmt::Display for TestItem
338 {
339 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
340 {
341 write!(f, "0 = {}", self.0)
342 }
343 }
344
345 let mut time_list =
346 OrderedTimerDeque
347 ::<TimerDequeueConsumer<TestItem>>
348 ::new("test_label".into(), 4, false).unwrap();
349
350 let s = Instant::now();
351
352 let tss_set = common::get_current_timestamp().timestamp()+2;
353 let ent1 = TestItem(1);
354 time_list.add_to_timer(ent1, tss_set, 0).unwrap();
355
356
357 for _ in 0..3
358 {
359 let event = time_list.wait_for_event().unwrap();
360 if let TimerReadRes::WouldBlock = event
361 {
362 std::thread::sleep(Duration::from_millis(1001));
363
364 continue;
365 }
366
367 assert_eq!(event, TimerReadRes::Ok(1));
368
369 let e = s.elapsed();
370 let ts = common::get_current_timestamp();
371
372 assert_eq!(ts.timestamp(), tss_set);
373 println!("instant: {:?} ts: {}, timerset: {}", e, ts.timestamp(), tss_set);
374
375 let mut tm_items: Vec<TestItem> = Vec::with_capacity(1);
376
377 time_list.timeout_event_handler(event, &mut tm_items,).unwrap();
378
379 assert_eq!(tm_items.len(), 1);
380 assert_eq!(tm_items.first().is_some(), true);
381 assert_eq!(tm_items.first().unwrap(), &TestItem(1));
382
383 return;
384 }
385
386 panic!("timeout befor timer!");
387 }
388}