asupersync 0.3.1

Spec-first, cancel-correct, capability-secure async runtime for Rust.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
//! Causal order verification for trace events.
//!
//! Uses logical timestamps attached to [`TraceEvent`]s to verify that
//! the recorded execution respects the happens-before partial order.
//!
//! # Verification Rules
//!
//! 1. **Monotonic sequence**: Within the same logical clock domain,
//!    later sequence numbers must have greater-or-equal logical times.
//! 2. **Causal consistency**: If event A causally precedes event B
//!    (same task, A.seq < B.seq), then A's logical time must be
//!    strictly less than B's logical time.
//! 3. **No backward causation**: A receive event's logical time must
//!    be strictly greater than the sender's logical time at send.

use crate::trace::distributed::{CausalOrder, LogicalTime};
use crate::trace::event::{TraceData, TraceEvent, TraceEventKind};
use crate::types::TaskId;
use core::fmt;
use std::collections::BTreeMap;

fn causal_order(a: &LogicalTime, b: &LogicalTime) -> CausalOrder {
    match a.partial_cmp(b) {
        Some(std::cmp::Ordering::Less) => CausalOrder::Before,
        Some(std::cmp::Ordering::Greater) => CausalOrder::After,
        Some(std::cmp::Ordering::Equal) => CausalOrder::Equal,
        None => CausalOrder::Concurrent,
    }
}

/// A violation of causal ordering in a trace.
#[derive(Debug, Clone)]
pub struct CausalityViolation {
    /// The kind of violation.
    pub kind: CausalityViolationKind,
    /// Index of the earlier event in the trace.
    pub earlier_idx: usize,
    /// Index of the later event in the trace.
    pub later_idx: usize,
    /// Sequence number of the earlier event.
    pub earlier_seq: u64,
    /// Sequence number of the later event.
    pub later_seq: u64,
}

/// The kind of causal ordering violation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CausalityViolationKind {
    /// Logical time went backward for sequentially ordered events.
    NonMonotonic,
    /// Two events on the same task have incomparable (concurrent) logical
    /// times, but they must be ordered since they share a single thread
    /// of execution.
    SameTaskConcurrent,
    /// A dependent event (e.g. wake after spawn) has a logical time that
    /// does not reflect the causal dependency.
    MissingDependency,
}

impl fmt::Display for CausalityViolation {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "{:?}: event[{}] (seq={}) -> event[{}] (seq={})",
            self.kind, self.earlier_idx, self.earlier_seq, self.later_idx, self.later_seq,
        )
    }
}

/// Verifies causal consistency of a trace using attached logical timestamps.
///
/// Events without logical timestamps are skipped during verification.
pub struct CausalOrderVerifier {
    violations: Vec<CausalityViolation>,
}

impl CausalOrderVerifier {
    /// Verify causal ordering of a trace.
    ///
    /// Returns `Ok(())` if the trace is causally consistent, or `Err` with
    /// all detected violations.
    pub fn verify(trace: &[TraceEvent]) -> Result<(), Vec<CausalityViolation>> {
        let mut verifier = Self {
            violations: Vec::new(),
        };

        verifier.check_monotonic(trace);
        verifier.check_same_task_ordering(trace);
        verifier.check_causal_dependencies(trace);

        if verifier.violations.is_empty() {
            Ok(())
        } else {
            Err(verifier.violations)
        }
    }

    /// Check that logical times are monotonically non-decreasing by sequence.
    fn check_monotonic(&mut self, trace: &[TraceEvent]) {
        let mut last_time: Option<(usize, u64, &LogicalTime)> = None;

        for (idx, event) in trace.iter().enumerate() {
            let Some(ref lt) = event.logical_time else {
                continue;
            };

            if let Some((prev_idx, prev_seq, prev_lt)) = last_time {
                // Same-type comparison only (Lamport vs Lamport, etc.)
                if let Some(ordering) = prev_lt.partial_cmp(lt) {
                    if ordering == std::cmp::Ordering::Greater {
                        self.violations.push(CausalityViolation {
                            kind: CausalityViolationKind::NonMonotonic,
                            earlier_idx: prev_idx,
                            later_idx: idx,
                            earlier_seq: prev_seq,
                            later_seq: event.seq,
                        });
                    }
                }
                // Different clock types are incomparable — skip check
            }

            last_time = Some((idx, event.seq, lt));
        }
    }

    /// Check that events on the same task have properly ordered logical times.
    fn check_same_task_ordering(&mut self, trace: &[TraceEvent]) {
        // Group events by task
        let mut task_events: BTreeMap<TaskId, Vec<(usize, &TraceEvent)>> = BTreeMap::new();

        for (idx, event) in trace.iter().enumerate() {
            if event.logical_time.is_none() {
                continue;
            }
            if let Some(task_id) = extract_task_id(event) {
                task_events.entry(task_id).or_default().push((idx, event));
            }
        }

        for events in task_events.values() {
            for window in events.windows(2) {
                let (idx_a, ev_a) = window[0];
                let (idx_b, ev_b) = window[1];

                let lt_a = ev_a.logical_time.as_ref().expect("logical_time must exist");
                let lt_b = ev_b.logical_time.as_ref().expect("logical_time must exist");

                match causal_order(lt_a, lt_b) {
                    CausalOrder::After | CausalOrder::Concurrent | CausalOrder::Equal => {
                        self.violations.push(CausalityViolation {
                            kind: CausalityViolationKind::SameTaskConcurrent,
                            earlier_idx: idx_a,
                            later_idx: idx_b,
                            earlier_seq: ev_a.seq,
                            later_seq: ev_b.seq,
                        });
                    }
                    CausalOrder::Before => {}
                }
            }
        }
    }

    /// Check that explicit causal dependencies have proper logical time ordering.
    ///
    /// For example, a Wake event for task T should have a logical time
    /// strictly after the Spawn event for task T.
    fn check_causal_dependencies(&mut self, trace: &[TraceEvent]) {
        // Map task_id -> spawn event index
        let mut spawn_events: BTreeMap<TaskId, (usize, &TraceEvent)> = BTreeMap::new();

        for (idx, event) in trace.iter().enumerate() {
            if event.logical_time.is_none() {
                continue;
            }

            if let Some(task_id) = extract_task_id(event) {
                if event.kind == TraceEventKind::Spawn {
                    spawn_events.insert(task_id, (idx, event));
                } else if event.kind == TraceEventKind::Wake
                    || event.kind == TraceEventKind::Schedule
                {
                    // Wake/Schedule must happen after spawn
                    if let Some(&(spawn_idx, spawn_ev)) = spawn_events.get(&task_id) {
                        let spawn_lt = spawn_ev
                            .logical_time
                            .as_ref()
                            .expect("logical_time must exist");
                        let current_lt = event
                            .logical_time
                            .as_ref()
                            .expect("logical_time must exist");

                        match causal_order(spawn_lt, current_lt) {
                            CausalOrder::After | CausalOrder::Equal | CausalOrder::Concurrent => {
                                self.violations.push(CausalityViolation {
                                    kind: CausalityViolationKind::MissingDependency,
                                    earlier_idx: spawn_idx,
                                    later_idx: idx,
                                    earlier_seq: spawn_ev.seq,
                                    later_seq: event.seq,
                                });
                            }
                            CausalOrder::Before => {}
                        }
                    }
                }
            }
        }
    }
}

/// Extract the task ID from a trace event, if present.
fn extract_task_id(event: &TraceEvent) -> Option<TaskId> {
    match &event.data {
        TraceData::Task { task, .. }
        | TraceData::Cancel { task, .. }
        | TraceData::Futurelock { task, .. }
        | TraceData::Obligation { task, .. }
        | TraceData::Worker { task, .. } => Some(*task),
        _ => None,
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::remote::NodeId;
    use crate::trace::distributed::{LamportClock, VectorClock};
    use crate::types::{RegionId, Time};

    fn task(id: u32) -> TaskId {
        TaskId::new_for_test(id, 0)
    }

    fn region() -> RegionId {
        RegionId::new_for_test(0, 0)
    }

    fn spawn_event(seq: u64, task_id: TaskId, lt: LogicalTime) -> TraceEvent {
        TraceEvent::spawn(seq, Time::ZERO, task_id, region()).with_logical_time(lt)
    }

    fn schedule_event(seq: u64, task_id: TaskId, lt: LogicalTime) -> TraceEvent {
        TraceEvent::schedule(seq, Time::ZERO, task_id, region()).with_logical_time(lt)
    }

    fn wake_event(seq: u64, task_id: TaskId, lt: LogicalTime) -> TraceEvent {
        TraceEvent::wake(seq, Time::ZERO, task_id, region()).with_logical_time(lt)
    }

    fn complete_event(seq: u64, task_id: TaskId, lt: LogicalTime) -> TraceEvent {
        TraceEvent::complete(seq, Time::ZERO, task_id, region()).with_logical_time(lt)
    }

    fn lamport_tick(clock: &LamportClock) -> LogicalTime {
        LogicalTime::Lamport(clock.tick())
    }

    // =========================================================================
    // Lamport clock tests
    // =========================================================================

    #[test]
    fn causal_verify_empty_trace() {
        assert!(CausalOrderVerifier::verify(&[]).is_ok());
    }

    #[test]
    fn causal_verify_single_event() {
        let clock = LamportClock::new();
        let trace = vec![spawn_event(0, task(1), lamport_tick(&clock))];
        assert!(CausalOrderVerifier::verify(&trace).is_ok());
    }

    #[test]
    fn causal_verify_monotonic_lamport() {
        let clock = LamportClock::new();
        let trace = vec![
            spawn_event(0, task(1), lamport_tick(&clock)),
            schedule_event(1, task(1), lamport_tick(&clock)),
            wake_event(2, task(1), lamport_tick(&clock)),
            complete_event(3, task(1), lamport_tick(&clock)),
        ];
        assert!(CausalOrderVerifier::verify(&trace).is_ok());
    }

    #[test]
    fn causal_verify_non_monotonic_lamport() {
        let clock = LamportClock::new();
        let t3 = LogicalTime::Lamport(clock.tick());
        let _t2 = LogicalTime::Lamport(clock.tick());
        let t1 = LogicalTime::Lamport(clock.tick());
        // Deliberately reversed: higher logical time first
        let trace = vec![
            spawn_event(0, task(1), t1),
            schedule_event(1, task(1), t3), // t3 < t1, non-monotonic
        ];
        let err = CausalOrderVerifier::verify(&trace).unwrap_err();
        assert!(
            err.iter()
                .any(|v| v.kind == CausalityViolationKind::NonMonotonic)
        );
    }

    #[test]
    fn causal_verify_same_task_ordering() {
        let clock = LamportClock::new();
        let t1 = LogicalTime::Lamport(clock.tick());
        let t2 = LogicalTime::Lamport(clock.tick());
        // task(1) spawn at t2, then schedule at t1 — violation
        let trace = vec![spawn_event(0, task(1), t2), schedule_event(1, task(1), t1)];
        let err = CausalOrderVerifier::verify(&trace).unwrap_err();
        assert!(
            err.iter()
                .any(|v| v.kind == CausalityViolationKind::SameTaskConcurrent)
        );
    }

    #[test]
    fn causal_verify_spawn_before_wake() {
        let clock = LamportClock::new();
        let trace = vec![
            spawn_event(0, task(1), lamport_tick(&clock)),
            wake_event(1, task(1), lamport_tick(&clock)),
        ];
        assert!(CausalOrderVerifier::verify(&trace).is_ok());
    }

    #[test]
    fn causal_verify_wake_before_spawn_violation() {
        let clock = LamportClock::new();
        let t1 = LogicalTime::Lamport(clock.tick());
        let t2 = LogicalTime::Lamport(clock.tick());
        // Spawn at t2 but wake at t1 — wake doesn't reflect spawn dependency
        let trace = vec![spawn_event(0, task(1), t2), wake_event(1, task(1), t1)];
        let err = CausalOrderVerifier::verify(&trace).unwrap_err();
        assert!(
            err.iter()
                .any(|v| v.kind == CausalityViolationKind::MissingDependency)
        );
    }

    // =========================================================================
    // Vector clock tests
    // =========================================================================

    #[test]
    fn causal_verify_concurrent_tasks_vector_clock() {
        // Two tasks with independent vector clocks — concurrent is fine
        // for different tasks
        let mut vc_a = VectorClock::new();
        let mut vc_b = VectorClock::new();
        let node_a = NodeId::new("node-a");
        let node_b = NodeId::new("node-b");

        vc_a.increment(&node_a);
        vc_b.increment(&node_b);

        let trace = vec![
            spawn_event(0, task(1), LogicalTime::Vector(vc_a.clone())),
            spawn_event(1, task(2), LogicalTime::Vector(vc_b.clone())),
        ];
        // Different tasks, concurrent clocks — should pass
        assert!(CausalOrderVerifier::verify(&trace).is_ok());
    }

    #[test]
    fn causal_verify_vector_clock_happens_before() {
        let mut vc = VectorClock::new();
        let node = NodeId::new("node-a");

        vc.increment(&node);
        let t1 = LogicalTime::Vector(vc.clone());
        vc.increment(&node);
        let t2 = LogicalTime::Vector(vc.clone());

        let trace = vec![spawn_event(0, task(1), t1), schedule_event(1, task(1), t2)];
        assert!(CausalOrderVerifier::verify(&trace).is_ok());
    }

    // =========================================================================
    // Events without logical time
    // =========================================================================

    #[test]
    fn causal_verify_events_without_logical_time_skipped() {
        // Events without logical_time should be ignored
        let trace = vec![
            TraceEvent::spawn(0, Time::ZERO, task(1), region()),
            TraceEvent::complete(1, Time::ZERO, task(1), region()),
        ];
        assert!(CausalOrderVerifier::verify(&trace).is_ok());
    }

    #[test]
    fn causal_verify_mixed_annotated_and_unannotated() {
        let clock = LamportClock::new();
        let trace = vec![
            spawn_event(0, task(1), lamport_tick(&clock)),
            TraceEvent::schedule(1, Time::ZERO, task(1), region()), // no logical time
            wake_event(2, task(1), lamport_tick(&clock)),
        ];
        assert!(CausalOrderVerifier::verify(&trace).is_ok());
    }

    // =========================================================================
    // Multiple tasks interleaved
    // =========================================================================

    #[test]
    fn causal_verify_interleaved_tasks_correct() {
        let clock = LamportClock::new();
        let trace = vec![
            spawn_event(0, task(1), lamport_tick(&clock)),
            spawn_event(1, task(2), lamport_tick(&clock)),
            schedule_event(2, task(1), lamport_tick(&clock)),
            schedule_event(3, task(2), lamport_tick(&clock)),
            complete_event(4, task(1), lamport_tick(&clock)),
            complete_event(5, task(2), lamport_tick(&clock)),
        ];
        assert!(CausalOrderVerifier::verify(&trace).is_ok());
    }
}