use crate::Deadline;
use parking_lot::Mutex;
use std::{
fmt, mem,
pin::Pin,
sync::{
Arc,
atomic::{AtomicBool, AtomicUsize, Ordering},
},
task::Waker,
time::Duration,
};
use zng_app_context::app_local;
use zng_handle::{Handle, HandleOwner, WeakHandle};
use zng_time::{DInstant, INSTANT, INSTANT_APP};
use zng_var::{Var, WeakVar, var};
use crate::{
LoopTimer,
handler::{AppHandler, AppHandlerArgs, AppWeakHandle},
update::UPDATES,
};
struct DeadlineHandlerEntry {
handle: HandleOwner<DeadlineState>,
handler: Mutex<Box<dyn FnMut(&dyn AppWeakHandle) + Send>>, pending: bool,
}
struct TimerHandlerEntry {
handle: HandleOwner<TimerState>,
handler: Mutex<Box<dyn FnMut(&TimerArgs, &dyn AppWeakHandle) + Send>>, pending: Option<Deadline>, }
struct WaitDeadline {
deadline: Deadline,
wakers: Mutex<Vec<Waker>>,
}
struct WaitDeadlineFut(Arc<WaitDeadline>);
impl Future for WaitDeadlineFut {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
if self.0.deadline.has_elapsed() {
std::task::Poll::Ready(())
} else {
let waker = cx.waker().clone();
self.0.wakers.lock().push(waker);
std::task::Poll::Pending
}
}
}
struct TimerVarEntry {
handle: HandleOwner<TimerState>,
weak_var: WeakVar<Timer>,
}
app_local! {
pub(crate) static TIMERS_SV: TimersService = const { TimersService::new() };
}
pub(crate) struct TimersService {
deadlines: Vec<WeakVar<Deadline>>,
wait_deadlines: Vec<std::sync::Weak<WaitDeadline>>,
timers: Vec<TimerVarEntry>,
deadline_handlers: Vec<DeadlineHandlerEntry>,
timer_handlers: Vec<TimerHandlerEntry>,
has_pending_handlers: bool,
}
impl TimersService {
const fn new() -> Self {
Self {
deadlines: vec![],
wait_deadlines: vec![],
timers: vec![],
deadline_handlers: vec![],
timer_handlers: vec![],
has_pending_handlers: false,
}
}
fn deadline(&mut self, deadline: Deadline) -> DeadlineVar {
let timer = var(deadline);
self.deadlines.push(timer.downgrade());
UPDATES.send_awake();
timer.read_only()
}
fn wait_deadline(&mut self, deadline: Deadline) -> impl Future<Output = ()> + Send + Sync + use<> {
let deadline = Arc::new(WaitDeadline {
deadline,
wakers: Mutex::new(vec![]),
});
self.wait_deadlines.push(Arc::downgrade(&deadline));
UPDATES.send_awake();
WaitDeadlineFut(deadline)
}
fn interval(&mut self, interval: Duration, paused: bool) -> TimerVar {
let (owner, handle) = TimerHandle::new(interval, paused);
let timer = var(Timer(handle));
self.timers.push(TimerVarEntry {
handle: owner,
weak_var: timer.downgrade(),
});
UPDATES.send_awake();
timer.read_only()
}
fn on_deadline<H>(&mut self, deadline: Deadline, mut handler: H) -> DeadlineHandle
where
H: AppHandler<DeadlineArgs>,
{
let (handle_owner, handle) = DeadlineHandle::new(deadline);
self.deadline_handlers.push(DeadlineHandlerEntry {
handle: handle_owner,
handler: Mutex::new(Box::new(move |handle| {
handler.event(
&DeadlineArgs {
timestamp: INSTANT.now(),
deadline,
},
&AppHandlerArgs { handle, is_preview: true },
)
})),
pending: false,
});
UPDATES.send_awake();
handle
}
fn on_interval<H>(&mut self, interval: Duration, paused: bool, mut handler: H) -> TimerHandle
where
H: AppHandler<TimerArgs>,
{
let (owner, handle) = TimerHandle::new(interval, paused);
self.timer_handlers.push(TimerHandlerEntry {
handle: owner,
handler: Mutex::new(Box::new(move |args, handle| {
handler.event(args, &AppHandlerArgs { handle, is_preview: true });
})),
pending: None,
});
UPDATES.send_awake();
handle
}
pub(crate) fn next_deadline(&self, timer: &mut LoopTimer) {
for wk in &self.deadlines {
if let Some(var) = wk.upgrade() {
timer.register(var.get());
}
}
for wk in &self.wait_deadlines {
if let Some(e) = wk.upgrade() {
timer.register(e.deadline);
}
}
for t in &self.timers {
if let Some(var) = t.weak_var.upgrade()
&& !t.handle.is_dropped()
&& !t.handle.data().paused.load(Ordering::Relaxed)
{
var.with(|t| {
let deadline = t.0.0.data().deadline.lock();
timer.register(deadline.current_deadline());
});
}
}
for e in &self.deadline_handlers {
if !e.handle.is_dropped() {
let deadline = e.handle.data().deadline;
timer.register(deadline);
}
}
for t in &self.timer_handlers {
if !t.handle.is_dropped() {
let state = t.handle.data();
if !state.paused.load(Ordering::Relaxed) {
let deadline = state.deadline.lock();
timer.register(deadline.current_deadline());
}
}
}
}
pub(crate) fn has_pending_updates(&self) -> bool {
self.has_pending_handlers
}
pub(crate) fn apply_updates(&mut self, timer: &mut LoopTimer) {
let now = INSTANT.now();
self.deadlines.retain(|wk| {
if let Some(var) = wk.upgrade() {
if !timer.elapsed(var.get()) {
return true; }
var.update();
}
false });
self.wait_deadlines.retain(|wk| {
if let Some(e) = wk.upgrade() {
if !e.deadline.has_elapsed() {
return true; }
for w in mem::take(&mut *e.wakers.lock()) {
w.wake();
}
}
false });
self.timers.retain(|t| {
if let Some(var) = t.weak_var.upgrade()
&& !t.handle.is_dropped()
{
if !t.handle.data().paused.load(Ordering::Relaxed) {
var.with(|t| {
let mut deadline = t.0.0.data().deadline.lock();
if timer.elapsed(deadline.current_deadline()) {
t.0.0.data().count.fetch_add(1, Ordering::Relaxed);
var.update();
deadline.last = now;
timer.register(deadline.current_deadline());
}
})
}
return true; }
false });
self.deadline_handlers.retain_mut(|e| {
if e.handle.is_dropped() {
return false; }
let deadline = e.handle.data().deadline;
e.pending = timer.elapsed(deadline);
self.has_pending_handlers |= e.pending;
true });
self.timer_handlers.retain_mut(|e| {
if e.handle.is_dropped() {
return false; }
let state = e.handle.data();
if !state.paused.load(Ordering::Relaxed) {
let mut deadline = state.deadline.lock();
if timer.elapsed(deadline.current_deadline()) {
state.count.fetch_add(1, Ordering::Relaxed);
e.pending = Some(deadline.current_deadline());
self.has_pending_handlers = true;
deadline.last = now;
timer.register(deadline.current_deadline());
}
}
true });
}
pub(crate) fn notify() {
let _s = tracing::trace_span!("TIMERS").entered();
let _t = INSTANT_APP.pause_for_update();
let mut timers = TIMERS_SV.write();
if !mem::take(&mut timers.has_pending_handlers) {
return;
}
let mut handlers = mem::take(&mut timers.deadline_handlers);
drop(timers);
handlers.retain_mut(|h| {
if h.pending {
(h.handler.get_mut())(&h.handle.weak_handle());
h.handle.data().executed.store(true, Ordering::Relaxed);
}
!h.pending });
let mut timers = TIMERS_SV.write();
handlers.append(&mut timers.deadline_handlers);
timers.deadline_handlers = handlers;
let mut handlers = mem::take(&mut timers.timer_handlers);
drop(timers);
handlers.retain_mut(|h| {
if let Some(deadline) = h.pending.take() {
let args = TimerArgs {
timestamp: INSTANT.now(),
deadline,
wk_handle: h.handle.weak_handle(),
};
(h.handler.get_mut())(&args, &h.handle.weak_handle());
}
!h.handle.is_dropped() });
let mut timers = TIMERS_SV.write();
handlers.append(&mut timers.timer_handlers);
timers.timer_handlers = handlers;
}
}
pub struct TIMERS;
impl TIMERS {
#[must_use]
pub fn deadline(&self, deadline: impl Into<Deadline>) -> DeadlineVar {
TIMERS_SV.write().deadline(deadline.into())
}
#[must_use]
pub fn interval(&self, interval: Duration, paused: bool) -> TimerVar {
TIMERS_SV.write().interval(interval, paused)
}
pub fn on_deadline<H>(&self, deadline: impl Into<Deadline>, handler: H) -> DeadlineHandle
where
H: AppHandler<DeadlineArgs>,
{
TIMERS_SV.write().on_deadline(deadline.into(), handler)
}
pub fn on_interval<H>(&self, interval: Duration, paused: bool, handler: H) -> TimerHandle
where
H: AppHandler<TimerArgs>,
{
TIMERS_SV.write().on_interval(interval, paused, handler)
}
}
impl TIMERS {
pub fn wait_deadline(&self, deadline: impl Into<Deadline>) -> impl Future<Output = ()> + Send + Sync + 'static {
TIMERS_SV.write().wait_deadline(deadline.into())
}
}
pub type DeadlineVar = Var<Deadline>;
#[derive(Clone, PartialEq, Eq, Hash)]
#[repr(transparent)]
#[must_use = "the timer is canceled if the handler is dropped"]
pub struct DeadlineHandle(Handle<DeadlineState>);
struct DeadlineState {
deadline: Deadline,
executed: AtomicBool,
}
impl DeadlineHandle {
pub fn dummy() -> DeadlineHandle {
DeadlineHandle(Handle::dummy(DeadlineState {
deadline: Deadline(DInstant::EPOCH),
executed: AtomicBool::new(false),
}))
}
fn new(deadline: Deadline) -> (HandleOwner<DeadlineState>, Self) {
let (owner, handle) = Handle::new(DeadlineState {
deadline,
executed: AtomicBool::new(false),
});
(owner, DeadlineHandle(handle))
}
pub fn perm(self) {
self.0.perm();
}
pub fn is_permanent(&self) -> bool {
self.0.is_permanent()
}
pub fn cancel(self) {
self.0.force_drop();
}
pub fn deadline(&self) -> Deadline {
self.0.data().deadline
}
pub fn has_executed(&self) -> bool {
self.0.data().executed.load(Ordering::Relaxed)
}
pub fn is_canceled(&self) -> bool {
!self.has_executed() && self.0.is_dropped()
}
pub fn downgrade(&self) -> WeakDeadlineHandle {
WeakDeadlineHandle(self.0.downgrade())
}
}
impl fmt::Debug for DeadlineHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DeadlineHandle")
.field("deadline", &self.deadline())
.field("handle", &self.0)
.field(
"state",
&if self.has_executed() {
"has_executed"
} else if self.is_canceled() {
"is_canceled"
} else {
"awaiting"
},
)
.finish()
}
}
#[derive(Clone, PartialEq, Eq, Hash, Default, Debug)]
pub struct WeakDeadlineHandle(WeakHandle<DeadlineState>);
impl WeakDeadlineHandle {
pub fn new() -> Self {
Self(WeakHandle::new())
}
pub fn upgrade(&self) -> Option<DeadlineHandle> {
self.0.upgrade().map(DeadlineHandle)
}
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct DeadlineArgs {
pub timestamp: DInstant,
pub deadline: Deadline,
}
#[derive(Clone, PartialEq, Eq, Hash)]
#[repr(transparent)]
#[must_use = "the timer is stopped if the handler is dropped"]
pub struct TimerHandle(Handle<TimerState>);
struct TimerState {
paused: AtomicBool,
deadline: Mutex<TimerDeadline>,
count: AtomicUsize,
}
struct TimerDeadline {
interval: Duration,
last: DInstant,
}
impl TimerDeadline {
fn current_deadline(&self) -> Deadline {
Deadline(self.last + self.interval)
}
}
impl TimerHandle {
fn new(interval: Duration, paused: bool) -> (HandleOwner<TimerState>, TimerHandle) {
let (owner, handle) = Handle::new(TimerState {
paused: AtomicBool::new(paused),
deadline: Mutex::new(TimerDeadline {
interval,
last: INSTANT.now(),
}),
count: AtomicUsize::new(0),
});
(owner, TimerHandle(handle))
}
pub fn dummy() -> TimerHandle {
TimerHandle(Handle::dummy(TimerState {
paused: AtomicBool::new(true),
deadline: Mutex::new(TimerDeadline {
interval: Duration::MAX,
last: DInstant::EPOCH,
}),
count: AtomicUsize::new(0),
}))
}
pub fn perm(self) {
self.0.perm();
}
pub fn is_permanent(&self) -> bool {
self.0.is_permanent()
}
pub fn stop(self) {
self.0.force_drop();
}
pub fn is_stopped(&self) -> bool {
self.0.is_dropped()
}
pub fn interval(&self) -> Duration {
self.0.data().deadline.lock().interval
}
pub fn set_interval(&self, new_interval: Duration) {
self.0.data().deadline.lock().interval = new_interval;
}
pub fn timestamp(&self) -> DInstant {
self.0.data().deadline.lock().last
}
pub fn deadline(&self) -> Deadline {
self.0.data().deadline.lock().current_deadline()
}
pub fn is_paused(&self) -> bool {
self.0.data().paused.load(Ordering::Relaxed)
}
pub fn pause(&self) {
self.0.data().paused.store(true, Ordering::Relaxed);
}
pub fn is_playing(&self) -> bool {
!self.is_paused() && !self.is_stopped()
}
pub fn play(&self, reset: bool) {
self.0.data().paused.store(false, Ordering::Relaxed);
if reset {
self.0.data().deadline.lock().last = INSTANT.now();
}
}
pub fn count(&self) -> usize {
self.0.data().count.load(Ordering::Relaxed)
}
pub fn set_count(&self, count: usize) {
self.0.data().count.store(count, Ordering::Relaxed)
}
pub fn downgrade(&self) -> WeakTimerHandle {
WeakTimerHandle(self.0.downgrade())
}
}
impl fmt::Debug for TimerHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TimerHandle")
.field("interval", &self.interval())
.field("count", &self.count())
.field("timestamp", &self.timestamp())
.field("handle", &self.0)
.field(
"state",
&if self.is_stopped() {
"is_stopped"
} else if self.is_paused() {
"is_paused"
} else {
"playing"
},
)
.finish()
}
}
#[derive(Clone, PartialEq, Eq, Hash, Default, Debug)]
pub struct WeakTimerHandle(WeakHandle<TimerState>);
impl WeakTimerHandle {
pub fn new() -> Self {
Self(WeakHandle::new())
}
pub fn upgrade(&self) -> Option<TimerHandle> {
self.0.upgrade().map(TimerHandle)
}
}
pub type TimerVar = Var<Timer>;
#[derive(Clone, PartialEq)]
pub struct Timer(TimerHandle);
impl fmt::Debug for Timer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Timer")
.field("interval", &self.interval())
.field("count", &self.count())
.field("is_paused", &self.is_paused())
.field("is_stopped", &self.is_stopped())
.finish_non_exhaustive()
}
}
impl Timer {
pub fn stop(&self) {
self.0.clone().stop();
}
pub fn is_stopped(&self) -> bool {
self.0.is_stopped()
}
pub fn interval(&self) -> Duration {
self.0.interval()
}
pub fn set_interval(&self, new_interval: Duration) {
self.0.set_interval(new_interval)
}
pub fn timestamp(&self) -> DInstant {
self.0.timestamp()
}
pub fn deadline(&self) -> Deadline {
self.0.deadline()
}
pub fn is_paused(&self) -> bool {
self.0.is_paused()
}
pub fn is_playing(&self) -> bool {
self.0.is_playing()
}
pub fn pause(&self) {
self.0.pause();
}
pub fn play(&self, reset: bool) {
self.0.play(reset);
}
pub fn count(&self) -> usize {
self.0.count()
}
pub fn set_count(&self, count: usize) {
self.0.set_count(count)
}
}
#[derive(Clone)]
pub struct TimerArgs {
pub timestamp: DInstant,
pub deadline: Deadline,
wk_handle: WeakHandle<TimerState>,
}
impl TimerArgs {
fn handle(&self) -> Option<TimerHandle> {
self.wk_handle.upgrade().map(TimerHandle)
}
pub fn interval(&self) -> Duration {
self.handle().map(|h| h.interval()).unwrap_or_default()
}
pub fn set_interval(&self, new_interval: Duration) {
if let Some(h) = self.handle() {
h.set_interval(new_interval)
}
}
pub fn is_paused(&self) -> bool {
self.handle().map(|h| h.is_paused()).unwrap_or(true)
}
pub fn is_playing(&self) -> bool {
self.handle().map(|h| h.is_playing()).unwrap_or(false)
}
pub fn pause(&self) {
if let Some(h) = self.handle() {
h.pause();
}
}
pub fn play(&self, reset: bool) {
if let Some(h) = self.handle() {
h.play(reset);
}
}
pub fn count(&self) -> usize {
self.handle().map(|h| h.count()).unwrap_or(0)
}
pub fn set_count(&self, count: usize) {
if let Some(h) = self.handle() {
h.set_count(count)
}
}
pub fn last_timestamp(&self) -> DInstant {
self.handle().map(|h| h.timestamp()).unwrap_or(self.timestamp)
}
pub fn next_deadline(&self) -> Deadline {
self.handle().map(|h| h.deadline()).unwrap_or(self.deadline)
}
pub fn is_stopped(&self) -> bool {
self.handle().is_none()
}
}
pub(crate) fn deadline_service(deadline: Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
Box::pin(TIMERS.wait_deadline(deadline))
}