task_scheduler/
lib.rs

1use std::time::{Duration, Instant};
2use std::sync::{Arc, Mutex, Condvar};
3use std::collections::BinaryHeap;
4use std::thread;
5use std::cmp::{Ord, PartialOrd, Ordering, Eq};
6
7struct Entry {
8	pub instant: Instant,
9	pub callback: Box<FnMut() + Send>
10}
11
12impl PartialOrd for Entry {
13	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
14		Some(self.cmp(other))
15	}
16}
17
18impl Eq for Entry {}
19
20impl PartialEq for Entry {
21	fn eq(&self, other: &Self) -> bool {
22		self.instant == other.instant
23	}
24}
25
26impl Ord for Entry {
27	fn cmp(&self, other: &Self) -> Ordering {
28		match self.instant.cmp(&other.instant) {
29			Ordering::Greater => Ordering::Less,
30			Ordering::Less => Ordering::Greater,
31			Ordering::Equal => Ordering::Equal
32		}
33	}
34}
35
36struct SharedData {
37	pub cond_var: Condvar,
38	pub callbacks: Mutex<BinaryHeap<Entry>>
39}
40
41pub struct Scheduler {
42	data: Arc<SharedData>
43}
44
45impl Scheduler {
46	pub fn new() -> Scheduler {
47		let shared_data = Arc::new(SharedData {
48			cond_var: Condvar::new(),
49			callbacks: Mutex::new(BinaryHeap::new())
50		});
51		{
52			let shared_data = shared_data.clone();
53			thread::spawn(move || {
54				let mut callbacks = shared_data.callbacks.lock().unwrap();
55				loop {
56					let entry = callbacks.pop();
57					match entry {
58						Some(mut entry) => {
59							let now = Instant::now();
60							if entry.instant > now {
61								let wait_duration = entry.instant - now;
62								callbacks.push(entry);
63								callbacks = shared_data.cond_var
64										.wait_timeout(callbacks, wait_duration).unwrap().0;
65							} else {
66								(entry.callback)()
67							}
68						}
69						None => {
70							callbacks = shared_data.cond_var.wait(callbacks).unwrap();
71						}
72					}
73				}
74			});
75		}
76
77		Scheduler {
78			data: shared_data
79		}
80	}
81
82	pub fn after_instant<F>(&self, instant: Instant, func: F)
83		where F: FnOnce() + Send + 'static {
84		let mut func = Some(func);
85		self.data.callbacks.lock().unwrap().push(Entry {
86			instant,
87			callback: Box::new(move || {
88				if let Some(func) = func.take() {
89					(func)()
90				}
91			}),
92		});
93		self.data.cond_var.notify_all();
94	}
95
96	pub fn after_duration<F>(&self, duration: Duration, func: F)
97		where F: FnOnce() + Send + 'static {
98		self.after_instant(Instant::now() + duration, func)
99	}
100}
101
102#[test]
103fn test() {
104	use std::sync::atomic::{AtomicBool, Ordering};
105
106	let atomic = Arc::new(AtomicBool::new(false));
107	let scheduler = Scheduler::new();
108	{
109		let atomic = atomic.clone();
110		scheduler.after_instant(Instant::now() + Duration::from_millis(10), move || {
111			atomic.store(true, Ordering::Relaxed);
112		});
113	}
114	thread::sleep(Duration::from_millis(100));
115	assert_eq!(atomic.load(Ordering::Relaxed), true);
116}