pdk_classy/
timer.rs

1// Copyright (c) 2025, Salesforce, Inc.,
2// All rights reserved.
3// For full license text, see the LICENSE.txt file
4
5//! Timer and clock management for async tasks.
6
7use futures::{Stream, StreamExt};
8use std::time::SystemTime;
9use std::{
10    pin::Pin,
11    rc::Rc,
12    task::{Context, Poll},
13    time::Duration,
14};
15
16use crate::{
17    extract::{context::ConfigureContext, AlreadyExtracted, FromContext},
18    host::Host,
19    reactor::root::{RootReactor, TickerId},
20    types::Cid,
21};
22
23/// Injectable clock to create timers.
24pub struct Clock {
25    host: Rc<dyn Host>,
26    root_reactor: Rc<RootReactor>,
27}
28
29/// Creates a new [`Timer`] with the given tick period.
30impl Clock {
31    pub fn period(self, period: Duration) -> Timer {
32        self.host.set_tick_period(period);
33        Timer { clock: Some(self) }
34    }
35}
36
37impl FromContext<ConfigureContext> for Clock {
38    type Error = AlreadyExtracted<Clock>;
39
40    fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
41        // Only one instance can be created
42        context.extract_unique(|| Self {
43            host: context.host.clone(),
44            root_reactor: context.root_reactor.clone(),
45        })
46    }
47}
48
49/// A timer that can be used to await for a fixed time interval.
50pub struct Timer {
51    clock: Option<Clock>,
52}
53
54impl Timer {
55    fn ticks(&self) -> Ticks<'_> {
56        Ticks {
57            id_and_cid: None,
58            count: 0,
59            ticker: self,
60        }
61    }
62
63    /// Returns a reference to the inner [Clock].
64    fn clock(&self) -> &Clock {
65        self.clock.as_ref().unwrap()
66    }
67
68    /// Waits for the next tick.
69    pub async fn next_tick(&self) -> bool {
70        self.ticks().next().await.is_some()
71    }
72
73    /// Waits for the first tick after `interval`.
74    pub async fn sleep(&self, interval: Duration) -> bool {
75        let mut now = SystemTime::now();
76        let release_time = now + interval;
77        let mut slept = true;
78        while now < release_time && slept {
79            slept = self.next_tick().await;
80            now = SystemTime::now();
81        }
82
83        slept
84    }
85
86    /// Resets the tick period
87    fn reset(&self) {
88        if let Some(clock) = self.clock.as_ref() {
89            clock.host.set_tick_period(Duration::from_nanos(0));
90        }
91    }
92
93    /// Consumes this [Timer] and releases the [Clock].
94    pub fn release(self) -> Clock {
95        self.reset();
96        let mut timer = self;
97        timer.clock.take().unwrap()
98    }
99}
100
101impl Drop for Timer {
102    fn drop(&mut self) {
103        self.reset();
104    }
105}
106
107#[doc(hidden)]
108pub struct Ticks<'a> {
109    id_and_cid: Option<(TickerId, Cid)>,
110    count: usize,
111    ticker: &'a Timer,
112}
113
114impl Ticks<'_> {
115    fn detach(&mut self) {
116        if let Some((id, _)) = &self.id_and_cid {
117            self.ticker.clock().root_reactor.remove_ticker(*id);
118            self.id_and_cid = None;
119        }
120    }
121}
122
123impl Drop for Ticks<'_> {
124    fn drop(&mut self) {
125        self.detach();
126    }
127}
128
129impl Stream for Ticks<'_> {
130    type Item = usize;
131
132    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
133        let this = &mut *self.as_mut();
134        let reactor = this.ticker.clock().root_reactor.as_ref();
135
136        if reactor.done() {
137            self.detach();
138            return Poll::Ready(None);
139        }
140
141        match &this.id_and_cid {
142            None => {
143                // Register the waker in the reactor.
144                let id = reactor.insert_ticker(cx.waker().clone());
145
146                let cid = reactor.active_cid();
147
148                self.id_and_cid = Some((id, cid));
149
150                reactor.set_paused(cid, true);
151
152                Poll::Pending
153            }
154            Some((id, cid)) => {
155                if reactor.consume_tick(*id, cx.waker()) {
156                    let count = this.count;
157                    this.count += 1;
158
159                    reactor.set_active_cid(*cid);
160                    this.ticker.clock().host.set_effective_context(cid.into());
161
162                    Poll::Ready(Some(count))
163                } else {
164                    Poll::Pending
165                }
166            }
167        }
168    }
169}