1use crate::SafeGenerator;
2use std::{
3 collections::VecDeque,
4 ops::Deref,
5 sync::{Arc, Condvar, Mutex, RwLock},
6};
7
8pub trait AsyncRunnerTrait: Send + Sync {
9 fn run_all_blocking(&self);
10
11 fn run_async(&self, task: Task);
12}
13
14#[derive(Clone)]
15pub struct AsyncRunner(Arc<dyn AsyncRunnerTrait>);
16
17impl AsyncRunner {
18 pub fn new(selfish: impl AsyncRunnerTrait + 'static) -> Self {
19 Self(Arc::new(selfish))
20 }
21
22 pub fn run_async<T>(&self, gen: std::sync::Arc<dyn Fn() -> SafeGenerator<T> + Send + Sync>)
23 where
24 T: Clone + Send + Sync + 'static,
25 {
26 let gen = gen();
27 let gen_ignoring_result = Arc::new(move || {
29 let _ = gen.next();
30 });
31 self.0.run_async(gen_ignoring_result);
32 }
33}
34
35impl Deref for AsyncRunner {
36 type Target = dyn AsyncRunnerTrait;
37 fn deref(&self) -> &Self::Target {
38 &*self.0
39 }
40}
41
42struct SingleThreadAsyncRunnerStruct {
43 tasks: VecDeque<Task>,
44}
45
46pub struct SingleThreadAsyncRunner(Arc<RwLock<SingleThreadAsyncRunnerStruct>>);
47
48impl SingleThreadAsyncRunner {
49 pub fn new() -> AsyncRunner {
50 AsyncRunner::new(SingleThreadAsyncRunner(Arc::new(RwLock::new(
51 SingleThreadAsyncRunnerStruct {
52 tasks: VecDeque::new(),
53 },
54 ))))
55 }
56}
57
58impl AsyncRunnerTrait for SingleThreadAsyncRunner {
59 fn run_all_blocking(&self) {
60 loop {
61 let task = {
62 let mut lock = self.0.write().unwrap();
63 lock.tasks.pop_front()
64 };
65 let Some(task) = task else {
66 break;
67 };
68 task();
69 }
70 }
71
72 fn run_async(&self, task: Task) {
73 let mut lock = self.0.write().unwrap();
74 lock.tasks.push_back(task);
75 }
76}
77
78#[derive(Clone)]
79pub struct Promise<T>
80where
81 T: Clone,
82{
83 next: Arc<RwLock<Option<Task>>>,
86 wait: WaitPair<T>,
87}
88
89impl<T> Promise<T>
90where
91 T: Clone,
92{
93 pub fn get(&self) -> Result<T, ()> {
94 let (lock, cvar) = &*self.wait;
95 let mut result = lock.lock().unwrap();
96 while result.is_none() {
97 result = cvar.wait(result).unwrap();
98 }
99 result.clone().unwrap()
100 }
101
102 fn next(&self) {
103 let next = {
104 let mut next = self.next.write().unwrap();
105 next.take()
106 };
107 if let Some(next) = next {
108 next();
109 }
110 }
111
112 pub fn on_ready(&self, next: Task) {
113 let result = {
114 let (lock, _) = &*self.wait;
115 lock.lock().unwrap().clone()
116 };
117 match result {
118 Some(_) => next(),
119 None => {
120 *self.next.write().unwrap() = Some(next);
121 }
122 }
123 }
124}
125
126#[derive(Clone)]
127pub struct PromiseBuilder<T>
128where
129 T: Clone,
130{
131 promise: Promise<T>,
132 wait: WaitPair<T>,
133}
134
135impl<T> PromiseBuilder<T>
136where
137 T: Clone,
138{
139 pub fn new() -> Self {
140 let wait = Arc::new((Mutex::new(None), Condvar::new()));
141 Self {
142 promise: Promise {
143 next: Arc::new(RwLock::new(None)),
144 wait: wait.clone(),
145 },
146 wait,
147 }
148 }
149
150 pub fn break_promise(&self) {
151 let (lock, cvar) = &*self.wait;
152 {
153 let mut result = lock.lock().unwrap();
154 *result = Some(Err(()));
155 }
156 self.promise().next();
157 cvar.notify_all();
158 }
159
160 pub fn complete(&self, value: T) {
161 let (lock, cvar) = &*self.wait;
162 {
163 let mut result = lock.lock().unwrap();
164 if result.is_none() {
166 *result = Some(Ok(value));
167 }
168 }
169 self.promise().next();
170 cvar.notify_all();
171 }
172
173 pub fn promise(&self) -> Promise<T> {
174 self.promise.clone()
175 }
176}
177
178pub type Task = std::sync::Arc<dyn Fn() + Send + Sync>;
179
180type WaitPair<T> = Arc<(Mutex<Option<Result<T, ()>>>, Condvar)>;