temper_core/
promise.rs

1use crate::SafeGenerator;
2use std::{
3    collections::VecDeque,
4    ops::Deref,
5    sync::{Arc, Condvar, Mutex, RwLock},
6};
7
8pub trait AsyncRunnerTrait: Send + Sync {
9    fn run_all_blocking(&self);
10
11    fn run_async(&self, task: Task);
12}
13
14#[derive(Clone)]
15pub struct AsyncRunner(Arc<dyn AsyncRunnerTrait>);
16
17impl AsyncRunner {
18    pub fn new(selfish: impl AsyncRunnerTrait + 'static) -> Self {
19        Self(Arc::new(selfish))
20    }
21
22    pub fn run_async<T>(&self, gen: std::sync::Arc<dyn Fn() -> SafeGenerator<T> + Send + Sync>)
23    where
24        T: Clone + Send + Sync + 'static,
25    {
26        let gen = gen();
27        // TODO Any way to avoid the extra Arc wrapping?
28        let gen_ignoring_result = Arc::new(move || {
29            let _ = gen.next();
30        });
31        self.0.run_async(gen_ignoring_result);
32    }
33}
34
35impl Deref for AsyncRunner {
36    type Target = dyn AsyncRunnerTrait;
37    fn deref(&self) -> &Self::Target {
38        &*self.0
39    }
40}
41
42struct SingleThreadAsyncRunnerStruct {
43    tasks: VecDeque<Task>,
44}
45
46pub struct SingleThreadAsyncRunner(Arc<RwLock<SingleThreadAsyncRunnerStruct>>);
47
48impl SingleThreadAsyncRunner {
49    pub fn new() -> AsyncRunner {
50        AsyncRunner::new(SingleThreadAsyncRunner(Arc::new(RwLock::new(
51            SingleThreadAsyncRunnerStruct {
52                tasks: VecDeque::new(),
53            },
54        ))))
55    }
56}
57
58impl AsyncRunnerTrait for SingleThreadAsyncRunner {
59    fn run_all_blocking(&self) {
60        loop {
61            let task = {
62                let mut lock = self.0.write().unwrap();
63                lock.tasks.pop_front()
64            };
65            let Some(task) = task else {
66                break;
67            };
68            task();
69        }
70    }
71
72    fn run_async(&self, task: Task) {
73        let mut lock = self.0.write().unwrap();
74        lock.tasks.push_back(task);
75    }
76}
77
78#[derive(Clone)]
79pub struct Promise<T>
80where
81    T: Clone,
82{
83    // Provide only one bonus listener for now. TODO Do we need more?
84    // TODO Just take a Generator directly instead of a function?
85    next: Arc<RwLock<Option<Task>>>,
86    wait: WaitPair<T>,
87}
88
89impl<T> Promise<T>
90where
91    T: Clone,
92{
93    pub fn get(&self) -> Result<T, ()> {
94        let (lock, cvar) = &*self.wait;
95        let mut result = lock.lock().unwrap();
96        while result.is_none() {
97            result = cvar.wait(result).unwrap();
98        }
99        result.clone().unwrap()
100    }
101
102    fn next(&self) {
103        let next = {
104            let mut next = self.next.write().unwrap();
105            next.take()
106        };
107        if let Some(next) = next {
108            next();
109        }
110    }
111
112    pub fn on_ready(&self, next: Task) {
113        let result = {
114            let (lock, _) = &*self.wait;
115            lock.lock().unwrap().clone()
116        };
117        match result {
118            Some(_) => next(),
119            None => {
120                *self.next.write().unwrap() = Some(next);
121            }
122        }
123    }
124}
125
126#[derive(Clone)]
127pub struct PromiseBuilder<T>
128where
129    T: Clone,
130{
131    promise: Promise<T>,
132    wait: WaitPair<T>,
133}
134
135impl<T> PromiseBuilder<T>
136where
137    T: Clone,
138{
139    pub fn new() -> Self {
140        let wait = Arc::new((Mutex::new(None), Condvar::new()));
141        Self {
142            promise: Promise {
143                next: Arc::new(RwLock::new(None)),
144                wait: wait.clone(),
145            },
146            wait,
147        }
148    }
149
150    pub fn break_promise(&self) {
151        let (lock, cvar) = &*self.wait;
152        {
153            let mut result = lock.lock().unwrap();
154            *result = Some(Err(()));
155        }
156        self.promise().next();
157        cvar.notify_all();
158    }
159
160    pub fn complete(&self, value: T) {
161        let (lock, cvar) = &*self.wait;
162        {
163            let mut result = lock.lock().unwrap();
164            // TODO Still notify below if previously set?
165            if result.is_none() {
166                *result = Some(Ok(value));
167            }
168        }
169        self.promise().next();
170        cvar.notify_all();
171    }
172
173    pub fn promise(&self) -> Promise<T> {
174        self.promise.clone()
175    }
176}
177
178pub type Task = std::sync::Arc<dyn Fn() + Send + Sync>;
179
180type WaitPair<T> = Arc<(Mutex<Option<Result<T, ()>>>, Condvar)>;