1use async_trait::async_trait;
10use aura_core::effects::time::{
11 LogicalClockEffects, OrderClockEffects, PhysicalTimeEffects, TimeComparison, TimeError,
12};
13use aura_core::time::{
14 LogicalTime, OrderTime, OrderingPolicy, TimeOrdering, TimeStamp, VectorClock,
15};
16use cfg_if::cfg_if;
17use rand::RngCore;
18#[cfg(not(target_arch = "wasm32"))]
19use std::time::Duration;
20
21#[cfg(target_arch = "wasm32")]
22type MonotonicInstant = web_time::Instant;
23#[cfg(not(target_arch = "wasm32"))]
24type MonotonicInstant = std::time::Instant;
25#[cfg(not(target_arch = "wasm32"))]
26use tokio::time;
27#[cfg(target_arch = "wasm32")]
28use wasm_bindgen::{closure::Closure, JsCast};
29
30cfg_if! {
31 if #[cfg(target_arch = "wasm32")] {
32 use js_sys::Date;
33 use web_sys::window;
34 } else {
35 use std::time::{SystemTime, UNIX_EPOCH};
36 }
37}
38
39#[allow(clippy::disallowed_methods)] pub fn monotonic_now() -> MonotonicInstant {
42 MonotonicInstant::now()
43}
44
45#[derive(Debug, Clone, Default)]
47pub struct PhysicalTimeHandler;
48
49impl PhysicalTimeHandler {
50 pub fn new() -> Self {
52 Self
53 }
54
55 #[allow(clippy::disallowed_methods)] pub fn physical_time_now_ms(&self) -> u64 {
63 #[cfg(target_arch = "wasm32")]
64 {
65 Date::now() as u64
66 }
67 #[cfg(not(target_arch = "wasm32"))]
68 {
69 let now = SystemTime::now()
70 .duration_since(UNIX_EPOCH)
71 .unwrap_or(Duration::ZERO);
72 now.as_millis() as u64
73 }
74 }
75
76 pub async fn sleep_until(&self, target_epoch_secs: u64) {
78 if let Ok(now) = self.physical_time().await {
79 let now_secs = now.ts_ms / 1000;
80 if target_epoch_secs > now_secs {
81 let delta = target_epoch_secs - now_secs;
82 let _ = self.sleep_ms(delta.saturating_mul(1000)).await;
83 }
84 }
85 }
86}
87
88#[async_trait]
89impl PhysicalTimeEffects for PhysicalTimeHandler {
90 #[tracing::instrument(name = "physical_time", level = "trace")]
91 async fn physical_time(&self) -> Result<aura_core::time::PhysicalTime, TimeError> {
92 let ts_ms = self.physical_time_now_ms();
93 let result = aura_core::time::PhysicalTime {
94 ts_ms,
95 uncertainty: None,
96 };
97
98 #[cfg(not(target_arch = "wasm32"))]
100 {
101 let start = monotonic_now();
102 let latency = start.elapsed();
103 tracing::trace!(
104 latency_ns = latency.as_nanos(),
105 "physical_time_access_latency"
106 );
107 }
108
109 Ok(result)
110 }
111
112 async fn sleep_ms(&self, ms: u64) -> Result<(), TimeError> {
113 #[cfg(target_arch = "wasm32")]
114 {
115 let window = window().ok_or(TimeError::ServiceUnavailable)?;
116 let receiver = {
117 let (sender, receiver) = futures::channel::oneshot::channel::<()>();
118 let callback = Closure::once(move || {
119 let _ = sender.send(());
120 });
121 let timeout_ms = i32::try_from(ms.min(i32::MAX as u64)).unwrap_or(i32::MAX);
122 window
123 .set_timeout_with_callback_and_timeout_and_arguments_0(
124 callback.as_ref().unchecked_ref(),
125 timeout_ms,
126 )
127 .map_err(|err| TimeError::OperationFailed {
128 reason: format!("setTimeout failed: {err:?}"),
129 })?;
130 callback.forget();
131 receiver
132 };
133 let _ = receiver.await;
134 }
135 #[cfg(not(target_arch = "wasm32"))]
136 {
137 time::sleep(Duration::from_millis(ms)).await;
138 }
139 Ok(())
140 }
141}
142
143#[async_trait]
146impl aura_core::effects::TimeEffects for PhysicalTimeHandler {}
147
148#[deprecated(
150 note = "Use the runtime-owned LogicalClockService (aura-agent) for stateful logical clocks. \
151 This handler remains as a pure helper."
152)]
153#[derive(Debug, Clone, Default)]
154pub struct LogicalClockHandler;
155
156#[allow(deprecated)]
157impl LogicalClockHandler {
158 pub fn new() -> Self {
160 Self
161 }
162
163 pub fn advance_logical_time(
165 current_vector: &VectorClock,
166 current_scalar: u64,
167 authority: Option<aura_core::types::identifiers::DeviceId>,
168 observed: Option<&VectorClock>,
169 ) -> LogicalTime {
170 let mut next_vector = current_vector.clone();
171 let mut next_scalar = current_scalar;
172
173 if let Some(obs) = observed {
174 for (auth, val) in obs.iter() {
175 let current_count = next_vector.get(auth).copied().unwrap_or(0);
176 next_vector.insert(*auth, current_count.max(*val));
177 }
178 let obs_max = obs.iter().map(|(_, v)| *v).max().unwrap_or(next_scalar);
180 next_scalar = next_scalar.max(obs_max);
181 }
182
183 next_scalar = next_scalar.saturating_add(1);
185 if let Some(auth) = authority {
186 let current_count = next_vector.get(&auth).copied().unwrap_or(0);
187 next_vector.insert(auth, current_count.saturating_add(1));
188 }
189
190 LogicalTime {
191 vector: next_vector,
192 lamport: next_scalar,
193 }
194 }
195}
196
197#[async_trait]
198#[allow(deprecated)]
199impl LogicalClockEffects for LogicalClockHandler {
200 #[tracing::instrument(name = "logical_advance", level = "trace", skip(observed))]
201 #[allow(clippy::disallowed_methods)] async fn logical_advance(
203 &self,
204 observed: Option<&VectorClock>,
205 ) -> Result<LogicalTime, TimeError> {
206 let start = monotonic_now();
207
208 let empty_vector = VectorClock::new();
212 let result = Self::advance_logical_time(&empty_vector, 0, None, observed);
213
214 let latency = start.elapsed();
216 tracing::trace!(
217 latency_ns = latency.as_nanos(),
218 vector_size = result.vector.len(),
219 "logical_advance_latency"
220 );
221
222 Ok(result)
223 }
224
225 #[tracing::instrument(name = "logical_now", level = "trace")]
226 #[allow(clippy::disallowed_methods)] async fn logical_now(&self) -> Result<LogicalTime, TimeError> {
228 let start = monotonic_now();
229
230 let result = LogicalTime {
233 vector: VectorClock::new(),
234 lamport: 0,
235 };
236
237 let latency = start.elapsed();
239 tracing::trace!(
240 latency_ns = latency.as_nanos(),
241 vector_size = result.vector.len(),
242 "logical_now_latency"
243 );
244
245 Ok(result)
246 }
247}
248
249#[derive(Debug, Clone, Default)]
251pub struct OrderClockHandler;
252
253#[async_trait]
254impl OrderClockEffects for OrderClockHandler {
255 #[tracing::instrument(name = "order_time", level = "trace")]
256 #[allow(clippy::disallowed_methods)] async fn order_time(&self) -> Result<OrderTime, TimeError> {
258 let start = monotonic_now();
259
260 let entropy = rand::rngs::OsRng.next_u64().to_le_bytes();
262 let mut hasher = aura_core::hash::hasher();
263 hasher.update(b"ORDER_TIME_TOKEN");
264 hasher.update(&entropy);
265 let hashed = hasher.finalize();
266 let result = OrderTime(hashed);
267
268 let latency = start.elapsed();
270 tracing::trace!(latency_ns = latency.as_nanos(), "order_time_latency");
271
272 Ok(result)
273 }
274}
275
276#[derive(Debug, Clone, Default)]
278pub struct TimeComparisonHandler;
279
280#[async_trait]
281impl TimeComparison for TimeComparisonHandler {
282 async fn compare(&self, a: &TimeStamp, b: &TimeStamp) -> Result<TimeOrdering, TimeError> {
283 Ok(a.compare(b, OrderingPolicy::Native))
284 }
285}