timer_deque_rs/
timer_consumer.rs1use std::fmt;
16
17use crate::error::{TimerError, TimerErrorType, TimerResult};
18use crate::timer::{OrderedTimerDeque, OrderedTimerDequeIntrf};
19use crate::{common, timer_err, TimerReadRes};
20use crate::timer_portable::timer::TimerExpMode;
21
22
23#[derive(Debug)]
35pub struct TimerDequeueConsumer<R: PartialEq + Eq + fmt::Debug + fmt::Display + Send>
36{
37 target: R,
39
40 timeout_absolute: TimerExpMode,
42}
43
44impl<R> fmt::Display
45for TimerDequeueConsumer<R>
46where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send
47{
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error>
49 {
50 write!(f, "{} until: {}", self.target, self.timeout_absolute)
51 }
52}
53
54
55impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> TimerDequeueConsumer<R>
56{
57 fn new(target: R, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<Self>
58 {
59 let abs_time_nsec = abs_time_nsec & 999_999_999;
60
61 let cur = common::get_current_timestamp();
62 let req = TimerExpMode::OneShot { sec: abs_time_sec, nsec: abs_time_nsec };
63
64 if TimerExpMode::from(cur) > req
65 {
66 timer_err!(TimerErrorType::Expired, "time already expired");
67 }
68
69 return Ok(
70 Self
71 {
72 target:
73 target,
74 timeout_absolute:
75 req
76 }
77 );
78 }
79
80 fn get_time_until(&self) -> TimerExpMode
81 {
82 return self.timeout_absolute;
83 }
84
85 fn into_inner(self) -> R
86 {
87 return self.target;
88 }
89}
90
91impl<R: PartialEq + Eq + fmt::Debug + fmt::Display + Send> Eq for TimerDequeueConsumer<R> {}
92
93impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> PartialEq for TimerDequeueConsumer<R>
94{
95 fn eq(&self, other: &Self) -> bool
96 {
97 return self.target == other.target;
98 }
99}
100
101impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> PartialEq<R> for TimerDequeueConsumer<R>
102{
103 fn eq(&self, other: &R) -> bool
104 {
105 return &self.target == other;
106 }
107}
108
109impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> Ord for TimerDequeueConsumer<R>
110{
111 fn cmp(&self, other: &Self) -> std::cmp::Ordering
112 {
113 return self.timeout_absolute.cmp(&other.timeout_absolute);
114 }
115}
116
117impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> PartialOrd for TimerDequeueConsumer<R>
118{
119 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
120 {
121 return Some(self.cmp(other));
122 }
123}
124
125impl<R> OrderedTimerDequeIntrf for TimerDequeueConsumer<R>
126where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send
127{
128 type Target = R;
130
131 type Ticket = common::NoTicket;
133
134 #[inline]
135 fn wrap(target: Self::Target, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, Self::Ticket)>
136 {
137 return Ok((Self::new(target, abs_time_sec, abs_time_nsec)?, common::NoTicket));
138 }
139
140 #[inline]
141 fn get_timeout_absolute(&self) -> TimerExpMode
142 {
143 return self.timeout_absolute;
144 }
145}
146
147impl<R> OrderedTimerDeque<TimerDequeueConsumer<R>>
148where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send
149{
150 pub
170 fn add_to_timer(&mut self, entity: R, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<common::NoTicket>
171 {
172 return self.add_to_timer_local(entity, abs_time_sec, abs_time_nsec);
173 }
174
175 pub
192 fn remove_from_sched_queue(&mut self, entity: &R) -> TimerResult<Option<R>>
193 {
194 if self.deque_timeout_list.len() == 0
195 {
196 timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
197 }
198 else
199 {
200 if self.deque_timeout_list.len() == 1
202 {
203 let ret_ent = self.deque_timeout_list.pop_front().unwrap();
205
206 self.stop_timer()?;
208
209 return Ok(Some(ret_ent.into_inner()));
210 }
211 else
212 {
213 for (pos, q_item)
217 in self.deque_timeout_list.iter().enumerate()
218 {
219 if q_item == entity
220 {
221 let ret_ent =
223 self.deque_timeout_list.remove(pos).unwrap().into_inner();
224
225 if pos == 0
227 {
228 self.reschedule_timer()?;
229 }
230
231 return Ok(Some(ret_ent));
232 }
233 }
234
235 return Ok(None);
236 }
237 }
238 }
239
240 pub
255 fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<R>) -> TimerResult<()>
256 {
257 if let TimerReadRes::WouldBlock = res
259 {
260 return Ok(());
261 }
262
263 let force_timer_resched =
265 TimerReadRes::Cancelled == res || TimerReadRes::Ok(0) == res;
266
267 let mut timeouts: Vec<R> = Vec::new();
268 let mut error: Option<TimerError> = None;
269
270 let cur_timestamp = common::get_current_timestamp();
271
272 loop
273 {
274 let Some(front_entity) =
276 self.deque_timeout_list.front()
277 else
278 {
279 let Err(e) = self.stop_timer()
280 else
281 {
282 break;
283 };
284
285 error = Some(e);
286
287 break;
288 };
289
290 let time_until = front_entity.get_time_until();
291
292 if time_until <= TimerExpMode::from(cur_timestamp)
293 {
294 timeouts.push(self.deque_timeout_list.pop_front().unwrap().into_inner());
295 }
296 else
297 {
298 if timeouts.is_empty() == false || force_timer_resched == true
299 {
300 self.reschedule_timer()?;
302 }
303
304 break;
305 }
306 }
307
308 *timeout_items = timeouts;
309
310 return error.map_or(Ok(()), |f| Err(f));
311 }
312
313}
314
315#[cfg(test)]
316mod tests
317{
318 use std::{fmt, time::{Duration, Instant}};
319
320 use crate::{common, timer::OrderedTimerDeque, timer_consumer::TimerDequeueConsumer, timer_portable::timer::TimerReadRes};
321
322 #[test]
323 fn test_0()
324 {
325 #[derive(Debug, PartialEq, Eq, Clone)]
326 struct TestItem(u64);
327
328 impl fmt::Display for TestItem
329 {
330 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
331 {
332 write!(f, "0 = {}", self.0)
333 }
334 }
335
336 let mut time_list =
337 OrderedTimerDeque
338 ::<TimerDequeueConsumer<TestItem>>
339 ::new("test_label".into(), 4, false).unwrap();
340
341 let s = Instant::now();
342
343 let tss_set = common::get_current_timestamp().timestamp()+2;
344 let ent1 = TestItem(1);
345 time_list.add_to_timer(ent1, tss_set, 0).unwrap();
346
347
348 for _ in 0..3
349 {
350 let event = time_list.wait_for_event().unwrap();
351 if let TimerReadRes::WouldBlock = event
352 {
353 std::thread::sleep(Duration::from_millis(1001));
354
355 continue;
356 }
357
358 assert_eq!(event, TimerReadRes::Ok(1));
359
360 let e = s.elapsed();
361 let ts = common::get_current_timestamp();
362
363 assert_eq!(ts.timestamp(), tss_set);
364 println!("instant: {:?} ts: {}, timerset: {}", e, ts.timestamp(), tss_set);
365
366 let mut tm_items: Vec<TestItem> = Vec::with_capacity(1);
367
368 time_list.timeout_event_handler(event, &mut tm_items,).unwrap();
369
370 assert_eq!(tm_items.len(), 1);
371 assert_eq!(tm_items.first().is_some(), true);
372 assert_eq!(tm_items.first().unwrap(), &TestItem(1));
373
374 return;
375 }
376
377 panic!("timeout befor timer!");
378 }
379}