Skip to main content

aura_effects/
time.rs

1//! Domain time handlers (Layer 3).
2//!
3//! Provides production implementations for:
4//! - PhysicalTimeEffects (system clock + sleep)
5//! - LogicalClockEffects (simple scalar + vector tracking)
6//! - OrderClockEffects (opaque sortable token)
7//! - TimeComparison (delegates to core comparison)
8
9use async_trait::async_trait;
10use aura_core::effects::time::{
11    LogicalClockEffects, OrderClockEffects, PhysicalTimeEffects, TimeComparison, TimeError,
12};
13use aura_core::time::{
14    LogicalTime, OrderTime, OrderingPolicy, TimeOrdering, TimeStamp, VectorClock,
15};
16use cfg_if::cfg_if;
17use rand::RngCore;
18#[cfg(not(target_arch = "wasm32"))]
19use std::time::Duration;
20
21#[cfg(target_arch = "wasm32")]
22type MonotonicInstant = web_time::Instant;
23#[cfg(not(target_arch = "wasm32"))]
24type MonotonicInstant = std::time::Instant;
25#[cfg(not(target_arch = "wasm32"))]
26use tokio::time;
27#[cfg(target_arch = "wasm32")]
28use wasm_bindgen::{closure::Closure, JsCast};
29
30cfg_if! {
31    if #[cfg(target_arch = "wasm32")] {
32        use js_sys::Date;
33        use web_sys::window;
34    } else {
35        use std::time::{SystemTime, UNIX_EPOCH};
36    }
37}
38
39/// Monotonic timestamp helper for layers that need batching or scheduling.
40#[allow(clippy::disallowed_methods)] // Monotonic clock access is permitted in effect handlers
41pub fn monotonic_now() -> MonotonicInstant {
42    MonotonicInstant::now()
43}
44
45/// Production physical clock handler backed by the system clock.
46#[derive(Debug, Clone, Default)]
47pub struct PhysicalTimeHandler;
48
49impl PhysicalTimeHandler {
50    /// Create a new physical clock handler.
51    pub fn new() -> Self {
52        Self
53    }
54
55    /// Synchronous physical time helper (ms since epoch).
56    ///
57    /// This is intended for UI/frontend call sites that are not async and need
58    /// a best-effort timestamp without spawning a runtime. It still sources time
59    /// from the system clock, so simulator-driven tests should prefer the async
60    /// `physical_time` trait method for full control.
61    #[allow(clippy::disallowed_methods)] // Effect implementation reads wall clock directly
62    pub fn physical_time_now_ms(&self) -> u64 {
63        #[cfg(target_arch = "wasm32")]
64        {
65            Date::now() as u64
66        }
67        #[cfg(not(target_arch = "wasm32"))]
68        {
69            let now = SystemTime::now()
70                .duration_since(UNIX_EPOCH)
71                .unwrap_or(Duration::ZERO);
72            now.as_millis() as u64
73        }
74    }
75
76    /// Sleep until a target epoch in seconds (best-effort).
77    pub async fn sleep_until(&self, target_epoch_secs: u64) {
78        if let Ok(now) = self.physical_time().await {
79            let now_secs = now.ts_ms / 1000;
80            if target_epoch_secs > now_secs {
81                let delta = target_epoch_secs - now_secs;
82                let _ = self.sleep_ms(delta.saturating_mul(1000)).await;
83            }
84        }
85    }
86}
87
88#[async_trait]
89impl PhysicalTimeEffects for PhysicalTimeHandler {
90    #[tracing::instrument(name = "physical_time", level = "trace")]
91    async fn physical_time(&self) -> Result<aura_core::time::PhysicalTime, TimeError> {
92        let ts_ms = self.physical_time_now_ms();
93        let result = aura_core::time::PhysicalTime {
94            ts_ms,
95            uncertainty: None,
96        };
97
98        // Record latency metrics
99        #[cfg(not(target_arch = "wasm32"))]
100        {
101            let start = monotonic_now();
102            let latency = start.elapsed();
103            tracing::trace!(
104                latency_ns = latency.as_nanos(),
105                "physical_time_access_latency"
106            );
107        }
108
109        Ok(result)
110    }
111
112    async fn sleep_ms(&self, ms: u64) -> Result<(), TimeError> {
113        #[cfg(target_arch = "wasm32")]
114        {
115            let window = window().ok_or(TimeError::ServiceUnavailable)?;
116            let receiver = {
117                let (sender, receiver) = futures::channel::oneshot::channel::<()>();
118                let callback = Closure::once(move || {
119                    let _ = sender.send(());
120                });
121                let timeout_ms = i32::try_from(ms.min(i32::MAX as u64)).unwrap_or(i32::MAX);
122                window
123                    .set_timeout_with_callback_and_timeout_and_arguments_0(
124                        callback.as_ref().unchecked_ref(),
125                        timeout_ms,
126                    )
127                    .map_err(|err| TimeError::OperationFailed {
128                        reason: format!("setTimeout failed: {err:?}"),
129                    })?;
130                callback.forget();
131                receiver
132            };
133            let _ = receiver.await;
134        }
135        #[cfg(not(target_arch = "wasm32"))]
136        {
137            time::sleep(Duration::from_millis(ms)).await;
138        }
139        Ok(())
140    }
141}
142
143/// Implement TimeEffects for PhysicalTimeHandler using default implementations
144/// (current_timestamp derives from physical_time via the trait default)
145#[async_trait]
146impl aura_core::effects::TimeEffects for PhysicalTimeHandler {}
147
148/// Simple logical clock handler - stateless pure functions for logical clock operations.
149#[deprecated(
150    note = "Use the runtime-owned LogicalClockService (aura-agent) for stateful logical clocks. \
151            This handler remains as a pure helper."
152)]
153#[derive(Debug, Clone, Default)]
154pub struct LogicalClockHandler;
155
156#[allow(deprecated)]
157impl LogicalClockHandler {
158    /// Create a new logical clock handler.
159    pub fn new() -> Self {
160        Self
161    }
162
163    /// Pure function to advance logical time based on observed vector clock.
164    pub fn advance_logical_time(
165        current_vector: &VectorClock,
166        current_scalar: u64,
167        authority: Option<aura_core::types::identifiers::DeviceId>,
168        observed: Option<&VectorClock>,
169    ) -> LogicalTime {
170        let mut next_vector = current_vector.clone();
171        let mut next_scalar = current_scalar;
172
173        if let Some(obs) = observed {
174            for (auth, val) in obs.iter() {
175                let current_count = next_vector.get(auth).copied().unwrap_or(0);
176                next_vector.insert(*auth, current_count.max(*val));
177            }
178            // Find max value in observed vector clock
179            let obs_max = obs.iter().map(|(_, v)| *v).max().unwrap_or(next_scalar);
180            next_scalar = next_scalar.max(obs_max);
181        }
182
183        // Bump the clock
184        next_scalar = next_scalar.saturating_add(1);
185        if let Some(auth) = authority {
186            let current_count = next_vector.get(&auth).copied().unwrap_or(0);
187            next_vector.insert(auth, current_count.saturating_add(1));
188        }
189
190        LogicalTime {
191            vector: next_vector,
192            lamport: next_scalar,
193        }
194    }
195}
196
197#[async_trait]
198#[allow(deprecated)]
199impl LogicalClockEffects for LogicalClockHandler {
200    #[tracing::instrument(name = "logical_advance", level = "trace", skip(observed))]
201    #[allow(clippy::disallowed_methods)] // Effect implementation uses Instant for metrics
202    async fn logical_advance(
203        &self,
204        observed: Option<&VectorClock>,
205    ) -> Result<LogicalTime, TimeError> {
206        let start = monotonic_now();
207
208        // Since this handler is now stateless, return a default logical time
209        // that starts from epoch. In a real application, the caller would need to
210        // track the current logical clock state and pass it to advance_logical_time().
211        let empty_vector = VectorClock::new();
212        let result = Self::advance_logical_time(&empty_vector, 0, None, observed);
213
214        // Record latency metrics
215        let latency = start.elapsed();
216        tracing::trace!(
217            latency_ns = latency.as_nanos(),
218            vector_size = result.vector.len(),
219            "logical_advance_latency"
220        );
221
222        Ok(result)
223    }
224
225    #[tracing::instrument(name = "logical_now", level = "trace")]
226    #[allow(clippy::disallowed_methods)] // Effect implementation uses Instant for metrics
227    async fn logical_now(&self) -> Result<LogicalTime, TimeError> {
228        let start = monotonic_now();
229
230        // Since this handler is now stateless, return epoch logical time.
231        // In a real application, the caller would manage the current logical clock state.
232        let result = LogicalTime {
233            vector: VectorClock::new(),
234            lamport: 0,
235        };
236
237        // Record latency metrics
238        let latency = start.elapsed();
239        tracing::trace!(
240            latency_ns = latency.as_nanos(),
241            vector_size = result.vector.len(),
242            "logical_now_latency"
243        );
244
245        Ok(result)
246    }
247}
248
249/// Opaque order clock handler that emits sortable random tokens.
250#[derive(Debug, Clone, Default)]
251pub struct OrderClockHandler;
252
253#[async_trait]
254impl OrderClockEffects for OrderClockHandler {
255    #[tracing::instrument(name = "order_time", level = "trace")]
256    #[allow(clippy::disallowed_methods)] // This IS the time handler implementation
257    async fn order_time(&self) -> Result<OrderTime, TimeError> {
258        let start = monotonic_now();
259
260        // Order clock must be unpredictable but stateless; use OS entropy here (allowed in L3 handler).
261        let entropy = rand::rngs::OsRng.next_u64().to_le_bytes();
262        let mut hasher = aura_core::hash::hasher();
263        hasher.update(b"ORDER_TIME_TOKEN");
264        hasher.update(&entropy);
265        let hashed = hasher.finalize();
266        let result = OrderTime(hashed);
267
268        // Record latency metrics
269        let latency = start.elapsed();
270        tracing::trace!(latency_ns = latency.as_nanos(), "order_time_latency");
271
272        Ok(result)
273    }
274}
275
276/// Convenience wrapper for comparing timestamps using core policies.
277#[derive(Debug, Clone, Default)]
278pub struct TimeComparisonHandler;
279
280#[async_trait]
281impl TimeComparison for TimeComparisonHandler {
282    async fn compare(&self, a: &TimeStamp, b: &TimeStamp) -> Result<TimeOrdering, TimeError> {
283        Ok(a.compare(b, OrderingPolicy::Native))
284    }
285}