use futures::{Stream, StreamExt};
use std::time::SystemTime;
use std::{
pin::Pin,
rc::Rc,
task::{Context, Poll},
time::Duration,
};
use crate::{
extract::{context::ConfigureContext, AlreadyExtracted, FromContext},
host::Host,
reactor::root::{RootReactor, TickerId},
types::Cid,
};
use crate::host::clock::Clock as HostClock;
pub struct Clock {
host: Rc<dyn Host>,
host_clock: Rc<dyn HostClock>,
root_reactor: Rc<RootReactor>,
}
impl Clock {
pub fn period(self, period: Duration) -> Timer {
self.host.set_tick_period(period);
Timer { clock: Some(self) }
}
pub fn now(&self) -> SystemTime {
self.host_clock.get_current_time()
}
}
impl FromContext<ConfigureContext> for Clock {
type Error = AlreadyExtracted<Clock>;
fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
context.extract_unique(|| Self {
host: context.host.clone(),
host_clock: context.clock.clone(),
root_reactor: context.root_reactor.clone(),
})
}
}
pub struct Timer {
clock: Option<Clock>,
}
impl Timer {
fn ticks(&self) -> Ticks<'_> {
Ticks {
id_and_cid: None,
count: 0,
ticker: self,
}
}
fn clock(&self) -> &Clock {
self.clock.as_ref().unwrap()
}
pub fn now(&self) -> SystemTime {
self.clock().host_clock.get_current_time()
}
pub async fn next_tick(&self) -> bool {
self.ticks().next().await.is_some()
}
pub async fn sleep(&self, interval: Duration) -> bool {
let mut now = self.now();
let release_time = now + interval;
let mut slept = true;
while now < release_time && slept {
slept = self.next_tick().await;
now = self.now();
}
slept
}
fn reset(&self) {
if let Some(clock) = self.clock.as_ref() {
clock.host.set_tick_period(Duration::from_nanos(0));
}
}
pub fn release(self) -> Clock {
self.reset();
let mut timer = self;
timer.clock.take().unwrap()
}
}
impl Drop for Timer {
fn drop(&mut self) {
self.reset();
}
}
#[doc(hidden)]
pub struct Ticks<'a> {
id_and_cid: Option<(TickerId, Cid)>,
count: usize,
ticker: &'a Timer,
}
impl Ticks<'_> {
fn detach(&mut self) {
if let Some((id, _)) = &self.id_and_cid {
self.ticker.clock().root_reactor.remove_ticker(*id);
self.id_and_cid = None;
}
}
}
impl Drop for Ticks<'_> {
fn drop(&mut self) {
self.detach();
}
}
impl Stream for Ticks<'_> {
type Item = usize;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self.as_mut();
let reactor = this.ticker.clock().root_reactor.as_ref();
if reactor.done() {
self.detach();
return Poll::Ready(None);
}
match &this.id_and_cid {
None => {
let id = reactor.insert_ticker(cx.waker().clone());
let cid = reactor.active_cid();
self.id_and_cid = Some((id, cid));
reactor.set_paused(cid, true);
Poll::Pending
}
Some((id, cid)) => {
if reactor.consume_tick(*id, cx.waker()) {
let count = this.count;
this.count += 1;
reactor.set_active_cid(*cid);
this.ticker.clock().host.set_effective_context(cid.into());
Poll::Ready(Some(count))
} else {
Poll::Pending
}
}
}
}
}