batpak 0.9.0

Event sourcing with causal graphs and caller-defined gates. Sync API, no async runtime.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
use crate::store::StoreError;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, OnceLock};
use std::time::Instant;

/// Runtime clock source for store timestamping, evidence, and deterministic tests.
///
/// Boundary: this seam owns every clock value that lands on disk, appears in a
/// public report or receipt, or participates in identity such as UUIDv7 wall
/// bits. Process-local wait deadlines still use `Instant`: cursor pull waits,
/// frontier waits, and writer idle backoff compute elapsed time for local
/// scheduling only, and never become durable bytes or receipt/report identity.
///
/// Production uses [`SystemClock`]. Tests and embeddings that need repeatable
/// store behavior can provide a custom implementation and install it with
/// [`crate::store::StoreConfig::with_clock`].
pub trait Clock: Send + Sync {
    /// Return microseconds since the Unix epoch.
    fn now_us(&self) -> i64;
    /// Return nanoseconds since the Unix epoch, saturating on overflow.
    fn now_wall_ns(&self) -> i64;
    /// Return process-local monotonic nanoseconds.
    fn now_mono_ns(&self) -> i64;
    /// Return the process-epoch marker for monotonic metadata.
    fn process_boot_ns(&self) -> u64;
}

/// Returns microseconds since Unix epoch, saturating to `i64::MAX` if the system
/// clock is beyond year ~292,277 (treat the max value as a clock-malfunction
/// signal). No panic; cache staleness checks downstream see a saturated value
/// and force a replay rather than poisoning the process.
fn system_now_us() -> i64 {
    let micros = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_micros();
    i64::try_from(micros).unwrap_or(i64::MAX)
}

/// Convert a public clock reading to persisted wall-clock milliseconds.
///
/// Custom clocks must report microseconds since Unix epoch as a non-negative
/// `i64`. Negative values are rejected as invalid caller input rather than
/// panicking in append/batch hot paths.
pub(crate) fn wall_ms_from_timestamp_us(timestamp_us: i64) -> Result<u64, StoreError> {
    if timestamp_us < 0 {
        return Err(StoreError::InvalidClock {
            timestamp_us,
            reason: "timestamp_us must be >= 0 microseconds since Unix epoch".into(),
        });
    }
    Ok((timestamp_us / 1000).cast_unsigned())
}

fn system_now_wall_ns_saturating() -> i64 {
    let nanos = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_nanos();
    i64::try_from(nanos).unwrap_or(i64::MAX)
}

/// Process-wide monotonic anchor. Captured on first call; subsequent calls read
/// the elapsed nanoseconds from `Instant::now()` relative to this anchor.
///
/// The anchor couples two facts:
///   1. `anchor_instant`: the `Instant` captured at first call.
///   2. `anchor_boot_ns`: a u64 marker that identifies *this* process's
///      monotonic epoch. Any cached monotonic value persisted to disk and then
///      read back by a different process MUST compare its `process_boot_ns`
///      against this value — mismatch means the monotonic value belongs to a
///      different process's clock and cannot be trusted.
pub(crate) struct MonotonicAnchor {
    anchor_instant: Instant,
    anchor_boot_ns: u64,
}

impl MonotonicAnchor {
    fn get() -> &'static Self {
        static ANCHOR: OnceLock<MonotonicAnchor> = OnceLock::new();
        ANCHOR.get_or_init(|| {
            // The boot marker is the wall-clock time at anchor creation, encoded
            // as nanoseconds since Unix epoch and saturated to u64. Two processes
            // booting in the same nanosecond on the same machine would collide,
            // which is acceptable (they would both re-project on mismatch anyway).
            let wall_ns = std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap_or_default()
                .as_nanos();
            let anchor_boot_ns = u64::try_from(wall_ns).unwrap_or(u64::MAX);
            MonotonicAnchor {
                anchor_instant: Instant::now(),
                anchor_boot_ns,
            }
        })
    }

    fn now_mono_ns(&self) -> i64 {
        let elapsed = self.anchor_instant.elapsed().as_nanos();
        i64::try_from(elapsed).unwrap_or(i64::MAX)
    }
}

/// Returns monotonic nanoseconds since the process-wide anchor. Guaranteed
/// non-decreasing within a single process; meaningless across processes
/// (use [`Clock::process_boot_ns`] to detect cross-process comparisons).
///
/// Saturates to `i64::MAX` if the process has been alive for more than
/// ~292 years.
#[derive(Clone)]
pub struct SystemClock {
    anchor: &'static MonotonicAnchor,
}

impl SystemClock {
    /// Create a production clock backed by system wall time and process-local monotonic time.
    pub fn new() -> Self {
        Self {
            anchor: MonotonicAnchor::get(),
        }
    }
}

impl Default for SystemClock {
    fn default() -> Self {
        Self::new()
    }
}

impl Clock for SystemClock {
    fn now_us(&self) -> i64 {
        system_now_us()
    }

    fn now_wall_ns(&self) -> i64 {
        system_now_wall_ns_saturating()
    }

    fn now_mono_ns(&self) -> i64 {
        self.anchor.now_mono_ns()
    }

    fn process_boot_ns(&self) -> u64 {
        self.anchor.anchor_boot_ns
    }
}

struct FnClock {
    inner: Arc<dyn Fn() -> i64 + Send + Sync>,
    anchor: &'static MonotonicAnchor,
}

impl FnClock {
    fn new(inner: Arc<dyn Fn() -> i64 + Send + Sync>) -> Self {
        Self {
            inner,
            anchor: MonotonicAnchor::get(),
        }
    }
}

impl Clock for FnClock {
    fn now_us(&self) -> i64 {
        (self.inner)()
    }

    fn now_wall_ns(&self) -> i64 {
        self.now_us().saturating_mul(1000)
    }

    fn now_mono_ns(&self) -> i64 {
        self.anchor.now_mono_ns()
    }

    fn process_boot_ns(&self) -> u64 {
        self.anchor.anchor_boot_ns
    }
}

pub(crate) fn clock_from_fn(inner: Arc<dyn Fn() -> i64 + Send + Sync>) -> Arc<dyn Clock> {
    Arc::new(FnClock::new(inner))
}

/// Non-decreasing wrapper around a clock source.
///
/// A user clock that regresses (e.g. NTP jump, manual reset) would poison age
/// comparisons — a slot cached at `now=1000` and read at `now=500` would look
/// like it's `-500` µs old, and a naive check can misclassify it. This wrapper
/// clamps each observed value to `max(last, new)`: once we see a value, we
/// never return anything smaller. Regressions emit `tracing::error!` with the
/// previous and new values and return the previous value — the user's clock
/// is broken, but the store keeps running.
#[derive(Clone)]
pub(crate) struct MonotonicClock {
    inner: Arc<dyn Clock>,
    last: Arc<AtomicI64>,
    last_wall_ns: Arc<AtomicI64>,
}

impl MonotonicClock {
    /// Wrap a clock. The returned handle is cloneable
    /// and stores shared state (`AtomicI64`) in an `Arc`, so clones observe the
    /// same non-decreasing sequence.
    ///
    /// `now_us` (microseconds) and `now_wall_ns` (nanoseconds) are each clamped
    /// against their own atomic. They are independent sequences — `SystemClock`
    /// derives them from two separate `SystemTime::now()` calls — so sharing a
    /// single atomic would cross-contaminate the two.
    pub(crate) fn wrap(inner: Arc<dyn Clock>) -> Self {
        Self {
            inner,
            last: Arc::new(AtomicI64::new(i64::MIN)),
            last_wall_ns: Arc::new(AtomicI64::new(i64::MIN)),
        }
    }

    /// Clamp `raw` to be non-decreasing relative to the highest value ever
    /// installed in `slot`. Returns `raw` when it advances the slot, otherwise
    /// logs a regression at `error` level and returns the previously installed
    /// value. The memory ordering (Acquire load, AcqRel/Acquire CAS) is the
    /// audited pattern shared by every non-decreasing clock sequence.
    fn clamp_non_decreasing(slot: &AtomicI64, raw: i64, what: &str) -> i64 {
        // Compare-and-swap loop: install `raw` if it's newer than the slot,
        // otherwise report a regression and keep the old value.
        loop {
            let prev = slot.load(Ordering::Acquire);
            if raw >= prev {
                match slot.compare_exchange(prev, raw, Ordering::AcqRel, Ordering::Acquire) {
                    Ok(_) => return raw,
                    Err(_) => continue, // another thread stored a newer value; retry
                }
            } else {
                tracing::error!("user clock regressed ({}): prev={} new={}", what, prev, raw);
                return prev;
            }
        }
    }

    /// Sample the wrapped clock and return a value that is never smaller than
    /// any value previously returned by this [`MonotonicClock`] (or any clone
    /// of it). A regression is logged at `error` level.
    pub(crate) fn now_us(&self) -> i64 {
        Self::clamp_non_decreasing(&self.last, self.inner.now_us(), "us")
    }
}

impl Clock for MonotonicClock {
    fn now_us(&self) -> i64 {
        MonotonicClock::now_us(self)
    }

    fn now_wall_ns(&self) -> i64 {
        Self::clamp_non_decreasing(&self.last_wall_ns, self.inner.now_wall_ns(), "wall_ns")
    }

    fn now_mono_ns(&self) -> i64 {
        self.inner.now_mono_ns()
    }

    fn process_boot_ns(&self) -> u64 {
        self.inner.process_boot_ns()
    }
}

#[cfg(test)]
mod tests {
    use super::{Clock, FnClock, MonotonicClock, SystemClock};
    use std::sync::atomic::{AtomicI64, Ordering};
    use std::sync::Arc;

    /// A mutable test clock whose `now_us` and `now_wall_ns` are independently
    /// programmable, so we can inject backward jumps into either sequence.
    struct AdjustableClock {
        us: AtomicI64,
        wall_ns: AtomicI64,
    }

    impl AdjustableClock {
        fn new(us: i64, wall_ns: i64) -> Arc<Self> {
            Arc::new(Self {
                us: AtomicI64::new(us),
                wall_ns: AtomicI64::new(wall_ns),
            })
        }
        fn set_us(&self, v: i64) {
            self.us.store(v, Ordering::SeqCst);
        }
        fn set_wall_ns(&self, v: i64) {
            self.wall_ns.store(v, Ordering::SeqCst);
        }
    }

    impl Clock for AdjustableClock {
        fn now_us(&self) -> i64 {
            self.us.load(Ordering::SeqCst)
        }
        fn now_wall_ns(&self) -> i64 {
            self.wall_ns.load(Ordering::SeqCst)
        }
        fn now_mono_ns(&self) -> i64 {
            0
        }
        fn process_boot_ns(&self) -> u64 {
            0
        }
    }

    #[test]
    fn fn_clock_preserves_negative_wall_values_but_not_monotonic_time() {
        // Anchor a monotonic floor FIRST: FnClock scripts only the wall clock,
        // so its monotonic reading must come from the same process-wide
        // MonotonicAnchor that SystemClock reads. The bounded spin (at most one
        // clock-resolution tick) pushes the floor past the trivial constants
        // (0, 1) a body-stubbing mutant substitutes for `now_mono_ns`.
        let system = SystemClock::new();
        let mut floor = system.now_mono_ns();
        while floor <= 1 {
            floor = system.now_mono_ns();
        }

        let clock = FnClock::new(Arc::new(|| -7));

        assert_eq!(
            clock.now_us(),
            -7,
            "PROPERTY: FnClock must expose malformed caller wall time for validation"
        );
        assert_eq!(
            clock.now_wall_ns(),
            -7_000,
            "PROPERTY: wall nanoseconds come from the caller wall clock, not the monotonic anchor"
        );
        let mono = clock.now_mono_ns();
        let ceiling = system.now_mono_ns();
        assert!(
            floor <= mono && mono <= ceiling,
            "PROPERTY: process-local monotonic evidence reads the shared process anchor \
             (floor {floor} <= mono {mono} <= ceiling {ceiling}), never an echo of the \
             negative caller wall clock or a stubbed constant"
        );
    }

    #[test]
    fn now_wall_ns_clamps_on_regression() {
        let inner = AdjustableClock::new(1_000, 5_000_000_000);
        let mono = MonotonicClock::wrap(Arc::clone(&inner) as Arc<dyn Clock>);

        assert_eq!(
            mono.now_wall_ns(),
            5_000_000_000,
            "PROPERTY: forward wall clock passes through unchanged"
        );
        // Inner wall clock jumps backward (NTP step-back / manual reset).
        inner.set_wall_ns(1_000_000_000);
        assert_eq!(
            mono.now_wall_ns(),
            5_000_000_000,
            "PROPERTY: a regressing wall clock stalls at the highest value seen, never moving backward"
        );
        // A subsequent forward move past the high-water mark advances again.
        inner.set_wall_ns(6_000_000_000);
        assert_eq!(
            mono.now_wall_ns(),
            6_000_000_000,
            "PROPERTY: forward progress past the high-water mark resumes"
        );
    }

    #[test]
    fn now_us_and_now_wall_ns_clamp_states_are_independent() {
        let inner = AdjustableClock::new(1_000, 5_000_000_000);
        let mono = MonotonicClock::wrap(Arc::clone(&inner) as Arc<dyn Clock>);

        // Prime both sequences.
        assert_eq!(mono.now_us(), 1_000);
        assert_eq!(mono.now_wall_ns(), 5_000_000_000);

        // Regress only the microsecond clock; wall_ns must still advance freely.
        inner.set_us(500);
        inner.set_wall_ns(9_000_000_000);
        assert_eq!(
            mono.now_us(),
            1_000,
            "PROPERTY: regressed us sequence stalls at its own high-water mark"
        );
        assert_eq!(
            mono.now_wall_ns(),
            9_000_000_000,
            "PROPERTY: wall_ns is not contaminated by the us regression (separate atomic)"
        );

        // Regress only the wall clock; us must still advance freely.
        inner.set_us(2_000);
        inner.set_wall_ns(1_000_000_000);
        assert_eq!(
            mono.now_us(),
            2_000,
            "PROPERTY: us is not contaminated by the wall_ns regression (separate atomic)"
        );
        assert_eq!(
            mono.now_wall_ns(),
            9_000_000_000,
            "PROPERTY: regressed wall_ns sequence stalls at its own high-water mark"
        );
    }

    #[test]
    fn system_clock_now_wall_ns_reports_real_wall_time() {
        // The production clock must report a genuine UNIX-epoch wall-clock
        // reading in nanoseconds, not a constant stand-in. 1_600_000_000 s
        // (~2020-09-13) is far below any real "now" yet far above the trivial
        // values (`0`, `1`) a body-stubbing mutant would substitute.
        const EARLIEST_PLAUSIBLE_WALL_NS: i64 = 1_600_000_000_000_000_000;

        let reading = SystemClock::new().now_wall_ns();
        assert!(
            reading > EARLIEST_PLAUSIBLE_WALL_NS,
            "PROPERTY: SystemClock::now_wall_ns must read the real wall clock \
             (>{EARLIEST_PLAUSIBLE_WALL_NS} ns), got {reading}"
        );
    }

    #[test]
    fn monotonic_clock_process_boot_ns_delegates_real_anchor() {
        // MonotonicClock must forward the wrapped clock's process-epoch marker
        // rather than synthesize a value. The static MonotonicAnchor encodes the
        // wall-clock nanoseconds captured at first use, so any real boot marker is
        // far above the trivial constants (`0`, `1`) a body-stubbing mutant
        // substitutes. 1_600_000_000 s (~2020-09-13) is below any real "now" yet
        // far above those stubs. No blocking wait: process_boot_ns is a direct
        // return, so a divergent mutant fails an assertion immediately.
        const EARLIEST_PLAUSIBLE_BOOT_NS: u64 = 1_600_000_000_000_000_000;

        let inner = Arc::new(SystemClock::new());
        let inner_boot = inner.process_boot_ns();
        let mono = MonotonicClock::wrap(Arc::clone(&inner) as Arc<dyn Clock>);
        let wrapped_boot = mono.process_boot_ns();

        let mut failures: Vec<String> = Vec::new();
        if wrapped_boot != inner_boot {
            failures.push(format!(
                "PROPERTY: MonotonicClock::process_boot_ns must delegate to the \
                 wrapped clock unchanged: wrapped={wrapped_boot}, inner={inner_boot}"
            ));
        }
        if wrapped_boot <= EARLIEST_PLAUSIBLE_BOOT_NS {
            failures.push(format!(
                "PROPERTY: the process boot marker is the real anchor wall-ns \
                 (>{EARLIEST_PLAUSIBLE_BOOT_NS}), never a trivial stub: got {wrapped_boot}"
            ));
        }
        assert!(failures.is_empty(), "{failures:?}");
    }
}