Skip to main content

pdk_classy/
timer.rs

1// Copyright (c) 2026, Salesforce, Inc.,
2// All rights reserved.
3// For full license text, see the LICENSE.txt file
4
5//! Timer and clock management for async tasks.
6
7use futures::{Stream, StreamExt};
8use std::time::SystemTime;
9use std::{
10    pin::Pin,
11    rc::Rc,
12    task::{Context, Poll},
13    time::Duration,
14};
15
16use crate::{
17    extract::{context::ConfigureContext, AlreadyExtracted, FromContext},
18    host::Host,
19    reactor::root::{RootReactor, TickerId},
20    types::Cid,
21};
22
23use crate::host::clock::Clock as HostClock;
24
25/// Injectable clock to create timers.
26pub struct Clock {
27    host: Rc<dyn Host>,
28    host_clock: Rc<dyn HostClock>,
29    root_reactor: Rc<RootReactor>,
30}
31
32/// Creates a new [`Timer`] with the given tick period.
33impl Clock {
34    pub fn period(self, period: Duration) -> Timer {
35        self.host.set_tick_period(period);
36        Timer { clock: Some(self) }
37    }
38
39    /// Returns the current time. Use this method or [`Timer::now()`] instead of `SystemTime::now()`.
40    pub fn now(&self) -> SystemTime {
41        self.host_clock.get_current_time()
42    }
43}
44
45impl FromContext<ConfigureContext> for Clock {
46    type Error = AlreadyExtracted<Clock>;
47
48    fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
49        // Only one instance can be created
50        context.extract_unique(|| Self {
51            host: context.host.clone(),
52            host_clock: context.clock.clone(),
53            root_reactor: context.root_reactor.clone(),
54        })
55    }
56}
57
58/// A timer that can be used to await for a fixed time interval.
59pub struct Timer {
60    clock: Option<Clock>,
61}
62
63impl Timer {
64    fn ticks(&self) -> Ticks<'_> {
65        Ticks {
66            id_and_cid: None,
67            count: 0,
68            ticker: self,
69        }
70    }
71
72    /// Returns a reference to the inner [Clock].
73    fn clock(&self) -> &Clock {
74        self.clock.as_ref().unwrap()
75    }
76
77    /// Returns the current time. Use this method or [`Clock::now()`] instead of `SystemTime::now()`.
78    pub fn now(&self) -> SystemTime {
79        self.clock().host_clock.get_current_time()
80    }
81
82    /// Waits for the next tick.
83    pub async fn next_tick(&self) -> bool {
84        self.ticks().next().await.is_some()
85    }
86
87    /// Waits for the first tick after `interval`.
88    pub async fn sleep(&self, interval: Duration) -> bool {
89        let mut now = self.now();
90        let release_time = now + interval;
91        let mut slept = true;
92        while now < release_time && slept {
93            slept = self.next_tick().await;
94            now = self.now();
95        }
96
97        slept
98    }
99
100    /// Resets the tick period
101    fn reset(&self) {
102        if let Some(clock) = self.clock.as_ref() {
103            clock.host.set_tick_period(Duration::from_nanos(0));
104        }
105    }
106
107    /// Consumes this [Timer] and releases the [Clock].
108    pub fn release(self) -> Clock {
109        self.reset();
110        let mut timer = self;
111        timer.clock.take().unwrap()
112    }
113}
114
115impl Drop for Timer {
116    fn drop(&mut self) {
117        self.reset();
118    }
119}
120
121#[doc(hidden)]
122pub struct Ticks<'a> {
123    id_and_cid: Option<(TickerId, Cid)>,
124    count: usize,
125    ticker: &'a Timer,
126}
127
128impl Ticks<'_> {
129    fn detach(&mut self) {
130        if let Some((id, _)) = &self.id_and_cid {
131            self.ticker.clock().root_reactor.remove_ticker(*id);
132            self.id_and_cid = None;
133        }
134    }
135}
136
137impl Drop for Ticks<'_> {
138    fn drop(&mut self) {
139        self.detach();
140    }
141}
142
143impl Stream for Ticks<'_> {
144    type Item = usize;
145
146    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
147        let this = &mut *self.as_mut();
148        let reactor = this.ticker.clock().root_reactor.as_ref();
149
150        if reactor.done() {
151            self.detach();
152            return Poll::Ready(None);
153        }
154
155        match &this.id_and_cid {
156            None => {
157                // Register the waker in the reactor.
158                let id = reactor.insert_ticker(cx.waker().clone());
159
160                let cid = reactor.active_cid();
161
162                self.id_and_cid = Some((id, cid));
163
164                reactor.set_paused(cid, true);
165
166                Poll::Pending
167            }
168            Some((id, cid)) => {
169                if reactor.consume_tick(*id, cx.waker()) {
170                    let count = this.count;
171                    this.count += 1;
172
173                    reactor.set_active_cid(*cid);
174                    this.ticker.clock().host.set_effective_context(cid.into());
175
176                    Poll::Ready(Some(count))
177                } else {
178                    Poll::Pending
179                }
180            }
181        }
182    }
183}