rust_rcs_core/util/
timer.rs

1// Copyright 2023 宋昊文
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Condvar, Mutex};
16use std::thread;
17use std::time::{Duration, Instant};
18
19type Callback = Box<dyn FnOnce() + Send + 'static>;
20
21struct TimerSource {
22    t: Instant,
23    cb: Callback,
24}
25
26pub struct Timer {
27    cond_pair: Arc<(Mutex<(bool, Vec<TimerSource>, bool)>, Condvar)>,
28    thread: Option<thread::JoinHandle<()>>,
29}
30
31impl Timer {
32    /// Create a new Timer
33    ///
34    /// timer callbacks are invoked on its inner thread
35    pub fn new() -> Timer {
36        let v: Vec<TimerSource> = Vec::with_capacity(16);
37
38        let cond_pair = Arc::new((Mutex::new((false, v, false)), Condvar::new()));
39
40        let cloned_pair = Arc::clone(&cond_pair);
41
42        let thread = thread::spawn(move || loop {
43            let (mutex, cond) = &*cloned_pair;
44
45            let v = mutex.lock().unwrap();
46
47            let now = Instant::now();
48
49            let mut delay = Duration::MAX;
50
51            for elem in &*v.1 {
52                let d = elem.t.saturating_duration_since(now);
53                if d < delay {
54                    delay = d;
55                }
56            }
57
58            dbg!(delay);
59
60            let result = cond
61                .wait_timeout_while(v, delay, |v| {
62                    if v.0 || v.2 {
63                        return false;
64                    }
65                    let now = Instant::now();
66                    for elem in &*v.1 {
67                        if elem.t < now {
68                            return false;
69                        }
70                    }
71                    true
72                })
73                .unwrap();
74
75            let mut v = result.0;
76
77            v.0 = false;
78
79            let mut i = 0;
80
81            while i < v.1.len() {
82                let a = &v.1[i];
83
84                let now = Instant::now();
85
86                if a.t < now {
87                    let b = v.1.swap_remove(i);
88                    let cb = b.cb;
89                    cb();
90                } else {
91                    i = i + 1;
92                }
93            }
94
95            if v.2 {
96                dbg!("thread exit\n");
97                return;
98            }
99        });
100
101        Timer {
102            cond_pair,
103            thread: Some(thread),
104        }
105    }
106
107    /// Schedule a function that will be called once after 'delay'
108    ///
109    /// execution order of functions scheduled at a a same delay is not guaranteed
110    ///
111    /// # Warning
112    ///
113    /// Timer callbacks should be executed as switfly as possible,
114    /// if you need to block inside the callback, hand it over to another thread
115    ///
116    /// # Panics
117    ///
118    /// The `schedule` function will panic if delay is no bigger than Duration::ZERO.
119    pub fn schedule<F>(&self, delay: Duration, f: F)
120    where
121        F: FnOnce() + Send + 'static,
122    {
123        assert!(delay > Duration::ZERO);
124
125        let (mutex, cond) = &*self.cond_pair;
126
127        let mut v = mutex.lock().unwrap();
128
129        v.0 = true;
130
131        v.1.push(TimerSource {
132            t: Instant::now() + delay,
133            cb: Box::new(f),
134        });
135
136        cond.notify_one();
137    }
138}
139
140impl Drop for Timer {
141    fn drop(&mut self) {
142        dbg!("drop\n");
143
144        {
145            let (mutex, cond) = &*self.cond_pair;
146
147            let mut v = mutex.lock().unwrap();
148
149            v.2 = true;
150
151            cond.notify_one();
152        }
153
154        if let Some(t) = self.thread.take() {
155            t.join().unwrap();
156        }
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    #[test]
163    fn multiple_sources() {
164        let timer = super::Timer::new();
165        let (tx, rx) = std::sync::mpsc::channel();
166        let tx = std::sync::Arc::new(std::sync::Mutex::new(tx));
167        for i in 1..6 {
168            let tx = std::sync::Arc::clone(&tx);
169            print!("schedule {} over timer after {} seconds\n", i, i);
170            timer.schedule(super::Duration::from_secs(i), move || {
171                print!("send {} to callback\n", i);
172                tx.lock().unwrap().send(i).unwrap();
173            });
174        }
175
176        assert_eq!(Ok(1), rx.recv());
177        assert_eq!(Ok(2), rx.recv());
178        assert_eq!(Ok(3), rx.recv());
179        assert_eq!(Ok(4), rx.recv());
180        assert_eq!(Ok(5), rx.recv());
181
182        for i in 1..6 {
183            let tx = std::sync::Arc::clone(&tx);
184            print!("schedule {} over timer after {} seconds\n", i, 6 - i);
185            timer.schedule(super::Duration::from_secs(6 - i), move || {
186                print!("send {} to callback\n", i);
187                tx.lock().unwrap().send(i).unwrap();
188            });
189        }
190
191        assert_eq!(Ok(5), rx.recv());
192        assert_eq!(Ok(4), rx.recv());
193        assert_eq!(Ok(3), rx.recv());
194        assert_eq!(Ok(2), rx.recv());
195        assert_eq!(Ok(1), rx.recv());
196
197        std::thread::sleep(std::time::Duration::from_secs(8));
198    }
199}