use std::{rc::Rc, sync::Arc, time::Duration};
use futures::{task::ArcWake, Future};
use once_cell::sync::OnceCell;
use crate::{
platform::PlatformRunLoop, util::FutureCompleter, Handle, JoinHandle, RunLoopSender, Task,
};
pub struct RunLoop {
pub platform_run_loop: Rc<PlatformRunLoop>,
}
static MAIN_THREAD_SENDER: OnceCell<RunLoopSender> = OnceCell::new();
impl RunLoop {
pub(crate) fn new() -> Self {
let res = Self {
platform_run_loop: Rc::new(PlatformRunLoop::new()),
};
if MAIN_THREAD_SENDER.get().is_none() && Self::is_main_thread() {
MAIN_THREAD_SENDER
.set(res.new_sender())
.expect("Main thread sender already set");
}
res
}
#[must_use]
pub fn schedule<F>(&self, in_time: Duration, callback: F) -> Handle
where
F: FnOnce() + 'static,
{
let run_loop = self.platform_run_loop.clone();
let handle = run_loop.schedule(in_time, callback);
Handle::new(move || {
run_loop.unschedule(handle);
})
}
#[must_use]
pub fn schedule_next<F>(&self, callback: F) -> Handle
where
F: FnOnce() + 'static,
{
self.schedule(Duration::from_secs(0), callback)
}
pub async fn wait(&self, duration: Duration) {
let (future, completer) = FutureCompleter::<()>::new();
self.schedule(duration, move || {
completer.complete(());
})
.detach();
future.await
}
pub fn new_sender(&self) -> RunLoopSender {
RunLoopSender::new(self.platform_run_loop.new_sender())
}
pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> JoinHandle<T> {
let task = Arc::new(Task::new(self.new_sender(), future));
ArcWake::wake_by_ref(&task);
JoinHandle::new(task)
}
pub fn current() -> Self {
thread_local!(static RUN_LOOP: RunLoop = RunLoop::new());
RUN_LOOP.with(|run_loop| RunLoop {
platform_run_loop: run_loop.platform_run_loop.clone(),
})
}
pub fn sender_for_main_thread() -> RunLoopSender {
MAIN_THREAD_SENDER.get().cloned().unwrap_or_else(|| {
if Self::is_main_thread() {
Self::current().new_sender()
} else {
RunLoopSender::new_fallback(PlatformRunLoop::main_thread_fallback_sender())
}
})
}
pub fn is_main_thread() -> bool {
PlatformRunLoop::is_main_thread()
}
pub fn run(&self) {
self.platform_run_loop.run()
}
pub fn stop(&self) {
self.platform_run_loop.stop()
}
#[cfg(any(target_os = "macos", target_os = "linux", target_os = "windows"))]
pub fn run_app(&self) {
self.platform_run_loop.run_app();
}
#[cfg(any(target_os = "macos", target_os = "linux", target_os = "windows"))]
pub fn stop_app(&self) {
self.platform_run_loop.stop_app();
}
}
pub fn spawn<T: 'static>(future: impl Future<Output = T> + 'static) -> JoinHandle<T> {
RunLoop::current().spawn(future)
}
#[cfg(test)]
#[allow(clippy::bool_assert_comparison)]
mod tests {
use crate::{
util::{Capsule, FutureCompleter},
RunLoop,
};
use std::{
cell::RefCell,
rc::Rc,
sync::{Arc, Mutex},
thread,
time::{Duration, Instant},
};
#[test]
fn test_run() {
let rl = Rc::new(RunLoop::new());
let rlc = rl.clone();
let next_called = Rc::new(RefCell::new(false));
let next_called_clone = next_called.clone();
let start = Instant::now();
rl.schedule(Duration::from_millis(50), move || {
next_called_clone.replace(true);
rlc.stop();
})
.detach();
assert_eq!(*next_called.borrow(), false);
rl.run();
assert_eq!(*next_called.borrow(), true);
assert!(start.elapsed() >= Duration::from_millis(50));
}
#[test]
fn test_sender() {
let run_loop = Rc::new(RunLoop::new());
let rl = Arc::new(Mutex::new(Capsule::new(run_loop.clone())));
let sender = run_loop.new_sender();
let stop_called = Arc::new(Mutex::new(false));
let stop_called_clone = stop_called.clone();
run_loop
.schedule_next(|| {
thread::spawn(move || {
sender.send(move || {
let rl = rl.lock().unwrap();
let rl = rl.get_ref().unwrap();
*stop_called_clone.lock().unwrap() = true;
rl.stop();
});
});
})
.detach();
assert_eq!(*stop_called.lock().unwrap(), false);
run_loop.run();
assert_eq!(*stop_called.lock().unwrap(), true);
}
async fn wait(run_loop: Rc<RunLoop>, duration: Duration) {
let (future, completer) = FutureCompleter::<()>::new();
run_loop
.schedule(duration, move || {
completer.complete(());
})
.detach();
future.await
}
#[test]
fn test_async() {
let run_loop = Rc::new(RunLoop::new());
let w = wait(run_loop.clone(), Duration::from_millis(50));
let run_loop_clone = run_loop.clone();
run_loop.spawn(async move {
w.await;
run_loop_clone.stop();
});
let start = Instant::now();
run_loop.run();
assert!(start.elapsed() >= Duration::from_millis(50));
}
}