Skip to main content

pdk_classy/
timer.rs

1// Copyright (c) 2026, Salesforce, Inc.,
2// All rights reserved.
3// For full license text, see the LICENSE.txt file
4
5//! Timer and clock management for async tasks.
6
7use futures::{Stream, StreamExt};
8use std::time::SystemTime;
9use std::{
10    pin::Pin,
11    rc::Rc,
12    task::{Context, Poll},
13    time::Duration,
14};
15
16use crate::{
17    extract::{context::ConfigureContext, AlreadyExtracted, FromContext},
18    host::Host,
19    reactor::root::{RootReactor, TickerId},
20    types::Cid,
21};
22
23use crate::host::clock::Clock as HostClock;
24
25/// Injectable clock to create timers.
26pub struct Clock {
27    host: Rc<dyn Host>,
28    host_clock: Rc<dyn HostClock>,
29    root_reactor: Rc<RootReactor>,
30}
31
32/// Creates a new [`Timer`] with the given tick period.
33impl Clock {
34    pub fn period(self, period: Duration) -> Timer {
35        self.host.set_tick_period(period);
36        Timer { clock: Some(self) }
37    }
38}
39
40impl FromContext<ConfigureContext> for Clock {
41    type Error = AlreadyExtracted<Clock>;
42
43    fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
44        // Only one instance can be created
45        context.extract_unique(|| Self {
46            host: context.host.clone(),
47            host_clock: context.clock.clone(),
48            root_reactor: context.root_reactor.clone(),
49        })
50    }
51}
52
53/// A timer that can be used to await for a fixed time interval.
54pub struct Timer {
55    clock: Option<Clock>,
56}
57
58impl Timer {
59    fn ticks(&self) -> Ticks<'_> {
60        Ticks {
61            id_and_cid: None,
62            count: 0,
63            ticker: self,
64        }
65    }
66
67    /// Returns a reference to the inner [Clock].
68    fn clock(&self) -> &Clock {
69        self.clock.as_ref().unwrap()
70    }
71
72    fn now(&self) -> SystemTime {
73        self.clock().host_clock.get_current_time()
74    }
75
76    /// Waits for the next tick.
77    pub async fn next_tick(&self) -> bool {
78        self.ticks().next().await.is_some()
79    }
80
81    /// Waits for the first tick after `interval`.
82    pub async fn sleep(&self, interval: Duration) -> bool {
83        let mut now = self.now();
84        let release_time = now + interval;
85        let mut slept = true;
86        while now < release_time && slept {
87            slept = self.next_tick().await;
88            now = self.now();
89        }
90
91        slept
92    }
93
94    /// Resets the tick period
95    fn reset(&self) {
96        if let Some(clock) = self.clock.as_ref() {
97            clock.host.set_tick_period(Duration::from_nanos(0));
98        }
99    }
100
101    /// Consumes this [Timer] and releases the [Clock].
102    pub fn release(self) -> Clock {
103        self.reset();
104        let mut timer = self;
105        timer.clock.take().unwrap()
106    }
107}
108
109impl Drop for Timer {
110    fn drop(&mut self) {
111        self.reset();
112    }
113}
114
115#[doc(hidden)]
116pub struct Ticks<'a> {
117    id_and_cid: Option<(TickerId, Cid)>,
118    count: usize,
119    ticker: &'a Timer,
120}
121
122impl Ticks<'_> {
123    fn detach(&mut self) {
124        if let Some((id, _)) = &self.id_and_cid {
125            self.ticker.clock().root_reactor.remove_ticker(*id);
126            self.id_and_cid = None;
127        }
128    }
129}
130
131impl Drop for Ticks<'_> {
132    fn drop(&mut self) {
133        self.detach();
134    }
135}
136
137impl Stream for Ticks<'_> {
138    type Item = usize;
139
140    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
141        let this = &mut *self.as_mut();
142        let reactor = this.ticker.clock().root_reactor.as_ref();
143
144        if reactor.done() {
145            self.detach();
146            return Poll::Ready(None);
147        }
148
149        match &this.id_and_cid {
150            None => {
151                // Register the waker in the reactor.
152                let id = reactor.insert_ticker(cx.waker().clone());
153
154                let cid = reactor.active_cid();
155
156                self.id_and_cid = Some((id, cid));
157
158                reactor.set_paused(cid, true);
159
160                Poll::Pending
161            }
162            Some((id, cid)) => {
163                if reactor.consume_tick(*id, cx.waker()) {
164                    let count = this.count;
165                    this.count += 1;
166
167                    reactor.set_active_cid(*cid);
168                    this.ticker.clock().host.set_effective_context(cid.into());
169
170                    Poll::Ready(Some(count))
171                } else {
172                    Poll::Pending
173                }
174            }
175        }
176    }
177}