use std::{fmt::Debug, future::Future};
use atomic_float::AtomicF64;
use tokio::sync::oneshot;
use tracing::warn;
pub static CURR_FRAME: AtomicF64 = AtomicF64::new(0.0);
pub static LAST_FRAME: AtomicF64 = AtomicF64::new(0.0);
#[cfg(not(target_family = "wasm"))]
pub static ASYNC_RUNTIME: std::sync::LazyLock<tokio::runtime::Runtime> =
std::sync::LazyLock::new(|| {
tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime.")
});
#[cfg(feature = "egui")]
pub static CTX: std::sync::OnceLock<egui::Context> = std::sync::OnceLock::new();
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum State {
#[default]
Idle,
Pending,
Finished,
}
pub enum StateWithData<'a, T, E> {
Idle,
Pending,
Finished(&'a T),
Failed(&'a E),
}
impl<T: Debug, E: Debug> Debug for StateWithData<'_, T, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StateWithData::Idle => f.write_str("Idle"),
StateWithData::Pending => f.write_str("Pending"),
StateWithData::Finished(t) => f.debug_tuple("Finished").field(t).finish(),
StateWithData::Failed(e) => f.debug_tuple("Failed").field(e).finish(),
}
}
}
bitflags::bitflags! {
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ConfigFlags: u8 {
const RETAIN = 0b0000_0001;
const ABORT = 0b0000_0010;
}
}
impl Default for ConfigFlags {
fn default() -> Self {
Self::empty()
}
}
struct InFlight<T, E> {
recv: oneshot::Receiver<Result<T, E>>,
#[cfg(not(target_family = "wasm"))]
handle: tokio::task::AbortHandle,
}
impl<T, E> Debug for InFlight<T, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut out = f.debug_struct("InFlight");
out.field("recv", &"oneshot::Receiver<...>");
#[cfg(not(target_family = "wasm"))]
out.field("handle", &self.handle);
out.finish()
}
}
impl<T, E> InFlight<T, E> {
fn abort(&self) {
#[cfg(not(target_family = "wasm"))]
self.handle.abort();
}
fn poll_result(&mut self) -> Result<Result<T, E>, oneshot::error::TryRecvError> {
self.recv.try_recv()
}
}
pub struct Bind<T, E> {
drawn_time_last: f64,
drawn_time_prev: f64,
pub(crate) data: Option<Result<T, E>>,
in_flight: Option<InFlight<T, E>>,
pub(crate) state: State,
last_start_time: f64,
last_complete_time: f64,
pub config: ConfigFlags,
times_executed: usize,
}
impl<T, E> Debug for Bind<T, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut out = f.debug_struct("Bind");
let mut out = out
.field("state", &self.state)
.field("config", &self.config)
.field("drawn_time_last", &self.drawn_time_last)
.field("drawn_time_prev", &self.drawn_time_prev)
.field("last_start_time", &self.last_start_time)
.field("last_complete_time", &self.last_complete_time)
.field("times_executed", &self.times_executed);
if self.data.is_some() {
out = out.field("data", &"Some(...)");
} else {
out = out.field("data", &"None");
}
if let Some(in_flight) = &self.in_flight {
out = out.field("in_flight", in_flight);
} else {
out = out.field("in_flight", &"None");
}
out.finish()
}
}
impl<T: 'static, E: 'static> Default for Bind<T, E> {
fn default() -> Self {
Self::new(false)
}
}
#[cfg(not(target_family = "wasm"))]
pub trait MaybeSend: Send {}
#[cfg(not(target_family = "wasm"))]
impl<T: Send> MaybeSend for T {}
#[cfg(target_family = "wasm")]
pub trait MaybeSend {}
#[cfg(target_family = "wasm")]
impl<T> MaybeSend for T {}
impl<T: 'static, E: 'static> Bind<T, E> {
#[must_use]
pub const fn new(retain: bool) -> Self {
Self {
drawn_time_last: 0.0,
drawn_time_prev: 0.0,
data: None,
in_flight: None,
state: State::Idle,
last_start_time: 0.0,
last_complete_time: f64::MIN,
config: if retain {
ConfigFlags::RETAIN
} else {
ConfigFlags::empty()
},
times_executed: 0,
}
}
#[must_use]
pub const fn retain(&self) -> bool {
self.config.contains(ConfigFlags::RETAIN)
}
pub fn set_retain(&mut self, retain: bool) {
if retain {
self.config.insert(ConfigFlags::RETAIN);
} else {
self.config.remove(ConfigFlags::RETAIN);
}
}
#[must_use]
pub const fn abort_on_clear(&self) -> bool {
self.config.contains(ConfigFlags::ABORT)
}
pub fn set_abort(&mut self, abort: bool) {
if abort {
self.config.insert(ConfigFlags::ABORT);
} else {
self.config.remove(ConfigFlags::ABORT);
}
}
async fn req_inner<F>(fut: F, tx: oneshot::Sender<Result<T, E>>)
where
F: Future<Output = Result<T, E>> + 'static,
T: MaybeSend,
{
let result = fut.await;
if matches!(tx.send(result), Ok(())) {
#[cfg(feature = "egui")]
if let Some(ctx) = CTX.get() {
ctx.request_repaint();
}
} else {
warn!("Future result was dropped because the receiver was gone.");
}
}
pub fn request<Fut>(&mut self, f: Fut)
where
Fut: Future<Output = Result<T, E>> + MaybeSend + 'static,
T: MaybeSend,
E: MaybeSend,
{
self.poll();
self.abort();
self.last_start_time = CURR_FRAME.load(std::sync::atomic::Ordering::Relaxed);
let (tx, rx) = oneshot::channel();
tracing::trace!("spawning async request #{}", self.times_executed + 1);
#[cfg(not(target_family = "wasm"))]
let in_flight = InFlight {
recv: rx,
handle: ASYNC_RUNTIME.spawn(Self::req_inner(f, tx)).abort_handle(),
};
#[cfg(target_family = "wasm")]
let in_flight = {
wasm_bindgen_futures::spawn_local(Self::req_inner(f, tx));
InFlight { recv: rx }
};
self.in_flight = Some(in_flight);
self.state = State::Pending;
self.times_executed += 1;
}
#[must_use]
pub fn request_every<Fut>(&mut self, f: impl FnOnce() -> Fut, every: std::time::Duration) -> f64
where
Fut: Future<Output = Result<T, E>> + MaybeSend + 'static,
T: MaybeSend,
E: MaybeSend,
{
self.request_every_sec(f, every.as_secs_f64())
}
pub fn request_every_sec<Fut>(&mut self, f: impl FnOnce() -> Fut, secs: f64) -> f64
where
Fut: Future<Output = Result<T, E>> + MaybeSend + 'static,
T: MaybeSend,
E: MaybeSend,
{
let state = self.get_state();
let since_completed = self.since_completed_raw();
if state != State::Pending && since_completed > secs {
self.request(f());
}
secs - since_completed
}
pub fn abort(&mut self) {
if let Some(task) = self.in_flight.take() {
if self.config.contains(ConfigFlags::ABORT) {
task.abort();
}
}
if matches!(self.state, State::Pending) {
self.state = State::Idle;
}
}
pub fn refresh<Fut>(&mut self, f: Fut)
where
Fut: Future<Output = Result<T, E>> + MaybeSend + 'static,
T: MaybeSend,
E: MaybeSend,
{
self.clear();
self.request(f);
}
pub fn take(&mut self) -> Option<Result<T, E>> {
self.poll();
if matches!(self.state, State::Finished) {
assert!(
self.data.is_some(),
"State was Finished but data was None. This indicates a bug."
);
self.state = State::Idle;
self.data.take()
} else {
None
}
}
pub fn fill(&mut self, data: Result<T, E>) {
if self.just_completed() || self.just_started() {
tracing::warn!(
"Bind::fill called multiple times in the same frame. This may indicate a logic error in your update loop."
);
}
self.clear();
self.state = State::Finished;
let now = CURR_FRAME.load(std::sync::atomic::Ordering::Relaxed);
self.last_start_time = now;
self.last_complete_time = now;
self.data = Some(data);
}
#[must_use]
pub fn ok_ref(&mut self) -> Option<&T> {
self.poll();
self.data.as_ref()?.as_ref().ok()
}
#[must_use]
pub fn err_ref(&mut self) -> Option<&E> {
self.poll();
self.data.as_ref()?.as_ref().err()
}
pub fn take_ok(&mut self) -> Option<T> {
self.poll();
match self.data.take()? {
Ok(t) => {
self.state = State::Idle;
Some(t)
}
Err(e) => {
self.data = Some(Err(e));
None
}
}
}
pub fn is_idle(&mut self) -> bool {
self.poll();
matches!(self.state, State::Idle)
}
pub fn is_pending(&mut self) -> bool {
self.poll();
matches!(self.state, State::Pending)
}
pub fn is_finished(&mut self) -> bool {
self.poll();
matches!(self.state, State::Finished)
}
#[must_use]
pub fn is_ok(&mut self) -> bool {
self.poll();
matches!(self.data, Some(Ok(_)))
}
#[must_use]
pub fn is_err(&mut self) -> bool {
self.poll();
matches!(self.data, Some(Err(_)))
}
#[allow(clippy::float_cmp)]
pub fn just_completed(&mut self) -> bool {
self.poll();
self.last_complete_time == CURR_FRAME.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn on_finished(&mut self, f: impl FnOnce(&Result<T, E>)) {
if self.just_completed()
&& let Some(ref d) = self.data
{
f(d);
}
}
#[allow(clippy::float_cmp)]
pub fn just_started(&mut self) -> bool {
self.poll();
self.last_start_time == CURR_FRAME.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn get_start_time(&mut self) -> f64 {
self.poll();
self.last_start_time
}
pub fn get_complete_time(&mut self) -> f64 {
self.poll();
self.last_complete_time
}
pub fn get_elapsed(&mut self) -> f64 {
self.poll();
self.last_complete_time - self.last_start_time
}
pub fn since_started(&mut self) -> f64 {
self.poll();
CURR_FRAME.load(std::sync::atomic::Ordering::Relaxed) - self.last_start_time
}
pub fn since_completed(&mut self) -> f64 {
self.poll();
self.since_completed_raw()
}
fn since_completed_raw(&self) -> f64 {
CURR_FRAME.load(std::sync::atomic::Ordering::Relaxed) - self.last_complete_time
}
pub fn read(&mut self) -> &Option<Result<T, E>> {
self.poll();
&self.data
}
pub fn read_as_ref(&mut self) -> Option<Result<&T, &E>> {
self.poll();
self.data.as_ref().map(Result::as_ref)
}
pub fn read_mut(&mut self) -> &mut Option<Result<T, E>> {
self.poll();
&mut self.data
}
pub fn read_as_mut(&mut self) -> Option<Result<&mut T, &mut E>> {
self.poll();
self.data.as_mut().map(Result::as_mut)
}
pub fn get_state(&mut self) -> State {
self.poll();
self.state
}
pub fn state(&mut self) -> StateWithData<'_, T, E> {
self.poll();
match self.state {
State::Idle => StateWithData::Idle,
State::Pending => StateWithData::Pending,
State::Finished => match self.data.as_ref() {
Some(Ok(data)) => StateWithData::Finished(data),
Some(Err(err)) => StateWithData::Failed(err),
None => {
self.state = State::Idle;
StateWithData::Idle
}
},
}
}
pub fn state_or_request<Fut>(&mut self, f: impl FnOnce() -> Fut) -> StateWithData<'_, T, E>
where
Fut: Future<Output = Result<T, E>> + MaybeSend + 'static,
T: MaybeSend,
E: MaybeSend,
{
self.poll();
if self.data.is_none() && matches!(self.state, State::Idle) {
self.request(f());
}
self.state()
}
pub fn clear(&mut self) {
self.poll();
self.abort();
self.state = State::Idle;
self.data = None;
}
pub fn read_or_request<Fut>(&mut self, f: impl FnOnce() -> Fut) -> Option<&Result<T, E>>
where
Fut: Future<Output = Result<T, E>> + MaybeSend + 'static,
T: MaybeSend,
E: MaybeSend,
{
self.poll();
if self.data.is_none() && matches!(self.state, State::Idle) {
self.request(f());
}
self.data.as_ref()
}
pub fn read_mut_or_request<Fut>(&mut self, f: impl FnOnce() -> Fut) -> Option<&mut Result<T, E>>
where
Fut: Future<Output = Result<T, E>> + MaybeSend + 'static,
T: MaybeSend,
E: MaybeSend,
{
self.poll();
if self.data.is_none() && matches!(self.state, State::Idle) {
self.request(f());
}
self.data.as_mut()
}
pub fn poll(&mut self) {
let curr_frame = CURR_FRAME.load(std::sync::atomic::Ordering::Relaxed);
#[allow(clippy::float_cmp)]
if curr_frame == self.drawn_time_last {
return;
}
self.drawn_time_prev = self.drawn_time_last;
self.drawn_time_last = curr_frame;
if !self.retain() && !self.was_drawn_last_frame() {
self.abort();
self.state = State::Idle;
self.data = None;
}
if matches!(self.state, State::Pending) {
let task = self
.in_flight
.as_mut()
.expect("BUG: Pending but no in_flight.");
match task.poll_result() {
Ok(result) => {
self.data = Some(result);
self.last_complete_time = CURR_FRAME.load(std::sync::atomic::Ordering::Relaxed);
self.state = State::Finished;
self.in_flight = None; }
Err(oneshot::error::TryRecvError::Empty) => {
}
Err(oneshot::error::TryRecvError::Closed) => {
tracing::warn!(
"Async task cancelled: sender dropped without sending a result."
);
self.state = State::Idle;
self.in_flight = None;
}
}
}
}
#[allow(clippy::float_cmp)]
pub fn was_drawn_this_frame(&self) -> bool {
self.drawn_time_last == CURR_FRAME.load(std::sync::atomic::Ordering::Relaxed)
}
#[allow(clippy::float_cmp)]
pub fn was_drawn_last_frame(&self) -> bool {
self.drawn_time_prev == LAST_FRAME.load(std::sync::atomic::Ordering::Relaxed)
}
pub const fn count_executed(&self) -> usize {
self.times_executed
}
}