asupersync 0.3.4

Spec-first, cancel-correct, capability-secure async runtime for Rust.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
//! Metamorphic tests for semaphore acquire/release pairing under cancellation.
//!
//! These tests verify invariant relationships that must hold regardless of
//! exact timing or cancellation patterns.

#![cfg(test)]

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

use crate::cx::Cx;
use crate::lab::LabRuntime;
use crate::sync::{AcquireError, Semaphore};
use crate::types::CancelKind;
use proptest::prelude::*;

/// MR1: Total Conservation - Permits are conserved across all operations
/// The total number of permits in the system (held + available) equals initial count
/// regardless of cancellation patterns.
#[derive(Debug)]
struct PermitConservation {
    initial_permits: usize,
    semaphore: Arc<Semaphore>,
    live_permits: AtomicUsize,
}

impl PermitConservation {
    fn new(initial_permits: usize) -> Self {
        Self {
            initial_permits,
            semaphore: Arc::new(Semaphore::new(initial_permits)),
            live_permits: AtomicUsize::new(0),
        }
    }

    fn verify_conservation(&self) {
        let available = self.semaphore.available_permits();
        let held = self.live_permits.load(Ordering::Acquire);
        let total = available + held;
        assert_eq!(
            total, self.initial_permits,
            "Permit conservation violated: initial={}, available={}, held={}, total={}",
            self.initial_permits, available, held, total
        );
    }
}

/// MR2: Idempotent Close - Closing multiple times has same effect as closing once
#[test]
fn mr_idempotent_close() {
    let sem = Arc::new(Semaphore::new(5));

    // Baseline: close once
    let sem1 = sem.clone();
    sem1.close();
    let closed_once = sem1.is_closed();
    let permits_once = sem1.available_permits();

    // Transform: close multiple times
    let sem2 = Arc::new(Semaphore::new(5));
    sem2.close();
    sem2.close();
    sem2.close();
    let closed_multiple = sem2.is_closed();
    let permits_multiple = sem2.available_permits();

    // MR: f(close(x)) = f(close(close(close(x))))
    assert_eq!(
        closed_once, closed_multiple,
        "Multiple closes should be idempotent"
    );
    assert_eq!(
        permits_once, permits_multiple,
        "Permit count after close should be idempotent"
    );
}

/// MR3: Cancellation Equivalence - Different cancellation patterns with same
/// total acquired permits should leave semaphore in equivalent states
#[test]
fn mr_cancellation_equivalence() {
    futures_lite::future::block_on(async {
        let initial_permits = 10;

        // Pattern A: acquire some, cancel others immediately
        let sem_a = Arc::new(Semaphore::new(initial_permits));
        let permit_count_a = Arc::new(AtomicUsize::new(0));

        let cx_a = Cx::for_testing();
        let _permit_a1 = sem_a.acquire(&cx_a, 3).await.unwrap();
        permit_count_a.fetch_add(3, Ordering::Relaxed);

        // Try to acquire 5, but cancel the context immediately
        let cx_cancel = Cx::for_testing();
        cx_cancel.set_cancel_requested(true);
        let result_a = sem_a.acquire(&cx_cancel, 5).await;
        assert!(result_a.is_err()); // Should be cancelled

        let available_a = sem_a.available_permits();

        // Pattern B: acquire same total successfully
        let sem_b = Arc::new(Semaphore::new(initial_permits));
        let cx_b = Cx::for_testing();
        let _permit_b1 = sem_b.acquire(&cx_b, 3).await.unwrap();
        let available_b = sem_b.available_permits();

        // MR: Different cancellation patterns with same net effect should yield same available permits
        assert_eq!(
            available_a, available_b,
            "Equivalent permit usage should yield same available count regardless of cancellation pattern"
        );
    });
}

/// MR4: Scaling Linearity - If all operations scale by factor k, results scale proportionally
#[test]
fn mr_scaling_linearity() {
    futures_lite::future::block_on(async {
        let base_permits = 6;
        let scale_factor = 3;

        // Base scenario
        let sem_base = Arc::new(Semaphore::new(base_permits));
        let cx = Cx::for_testing();
        let _permit1 = sem_base.acquire(&cx, 2).await.unwrap();
        let available_base = sem_base.available_permits();

        // Scaled scenario
        let sem_scaled = Arc::new(Semaphore::new(base_permits * scale_factor));
        let _permit2 = sem_scaled.acquire(&cx, 2 * scale_factor).await.unwrap();
        let available_scaled = sem_scaled.available_permits();

        // MR: f(k*x) should scale proportionally
        assert_eq!(
            available_scaled,
            available_base * scale_factor,
            "Scaling all permits by factor {} should scale results proportionally",
            scale_factor
        );
    });
}

/// MR5: Acquire-Release Roundtrip - Acquiring then releasing should restore original state
#[test]
fn mr_acquire_release_roundtrip() {
    futures_lite::future::block_on(async {
        let sem = Arc::new(Semaphore::new(10));
        let cx = Cx::for_testing();

        let original_permits = sem.available_permits();

        // Acquire permits
        let permit = sem.acquire(&cx, 4).await.unwrap();
        let after_acquire = sem.available_permits();
        assert_eq!(after_acquire, original_permits - 4);

        // Release via drop
        drop(permit);

        let after_release = sem.available_permits();

        // MR: acquire(n) then release should restore original count
        assert_eq!(
            after_release, original_permits,
            "Acquire-release roundtrip should restore original permit count"
        );
    });
}

/// MR6: Commutativity under non-overlapping regions - Two independent acquire/release
/// sequences should be commutative if they don't overlap in time
#[test]
fn mr_non_overlapping_commutativity() {
    futures_lite::future::block_on(async {
        let sem = Arc::new(Semaphore::new(10));
        let cx = Cx::for_testing();

        // Sequence A then B
        let sem_ab = sem.clone();
        let permit_a1 = sem_ab.acquire(&cx, 3).await.unwrap();
        drop(permit_a1);
        let permit_b1 = sem_ab.acquire(&cx, 2).await.unwrap();
        drop(permit_b1);
        let final_ab = sem_ab.available_permits();

        // Reset semaphore for sequence B then A
        let sem_ba = Arc::new(Semaphore::new(10));
        let permit_b2 = sem_ba.acquire(&cx, 2).await.unwrap();
        drop(permit_b2);
        let permit_a2 = sem_ba.acquire(&cx, 3).await.unwrap();
        drop(permit_a2);
        let final_ba = sem_ba.available_permits();

        // MR: Non-overlapping sequences should be commutative
        assert_eq!(
            final_ab, final_ba,
            "Non-overlapping acquire/release sequences should be commutative"
        );
    });
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RetryErrorKind {
    Cancelled,
    Closed,
}

fn canonical_acquire_error(error: AcquireError) -> RetryErrorKind {
    match error {
        AcquireError::Cancelled => RetryErrorKind::Cancelled,
        AcquireError::Closed | AcquireError::PolledAfterCompletion => RetryErrorKind::Closed,
    }
}

fn retry_error_sequence(seed: u64, attempts: usize, closed: bool) -> Vec<RetryErrorKind> {
    let mut runtime = LabRuntime::with_seed(seed);
    let mut sequence = Vec::with_capacity(attempts);

    for _ in 0..attempts {
        let sem = Semaphore::new(0);
        let cx = Cx::for_testing();
        if closed {
            sem.close();
        } else {
            cx.cancel_fast(CancelKind::User);
        }

        let error = match futures_lite::future::block_on(sem.acquire(&cx, 1)) {
            Ok(_) => panic!("retry attempt should produce a canonical acquisition error"),
            Err(error) => error,
        };
        sequence.push(canonical_acquire_error(error));
    }

    let violations = runtime.check_invariants();
    assert!(
        violations.is_empty(),
        "lab runtime invariant violations during retry MR: {violations:?}"
    );
    sequence
}

/// MR7b: Deterministic retry error sequence.
///
/// Transformation: replay the same N-attempt retry trace under the same lab
/// seed and identical input condition.
///
/// Relation: every replay produces the same canonical error sequence.
#[test]
fn mr_retry_same_input_replays_same_canonical_error_sequence() {
    proptest!(|(
        seed in any::<u64>(),
        attempts in 1usize..12,
        closed in any::<bool>(),
    )| {
        let first = retry_error_sequence(seed, attempts, closed);
        let second = retry_error_sequence(seed, attempts, closed);

        prop_assert_eq!(&first, &second,
            "retrying the same input under deterministic mode changed the canonical error sequence");
        prop_assert_eq!(first.len(), attempts,
            "retry sequence length should equal the requested attempt count");
        prop_assert!(
            first.iter().all(|kind| *kind == first[0]),
            "same-input retries should not alternate canonical error kind: {first:?}"
        );
    });
}

/// MR7: Cancel Monotonicity - Adding cancellation should never increase available permits
#[test]
fn mr_cancel_monotonicity() {
    futures_lite::future::block_on(async {
        let sem = Arc::new(Semaphore::new(8));

        // Scenario without cancellation
        let cx_no_cancel = Cx::for_testing();
        let _permit1 = sem.acquire(&cx_no_cancel, 3).await.unwrap();
        let available_no_cancel = sem.available_permits();

        // Reset and try with cancellation
        let sem_with_cancel = Arc::new(Semaphore::new(8));
        let cx_with_cancel = Cx::for_testing();
        let _permit2 = sem_with_cancel.acquire(&cx_with_cancel, 3).await.unwrap();

        // Try to acquire more but cancel it
        let cx_cancel = Cx::for_testing();
        cx_cancel.set_cancel_requested(true);
        let _ = sem_with_cancel.acquire(&cx_cancel, 2).await; // Should fail

        let available_with_cancel = sem_with_cancel.available_permits();

        // MR: Adding cancellation should not increase available permits
        assert!(
            available_with_cancel <= available_no_cancel + 2,
            "Cancellation should not increase available permits beyond what would be released"
        );
    });
}

/// MR8: Batch vs Sequential Equivalence - Acquiring N permits at once vs N permits
/// one-by-one should have equivalent end state (when all succeed)
#[test]
fn mr_batch_vs_sequential_equivalence() {
    futures_lite::future::block_on(async {
        let cx = Cx::for_testing();

        // Batch acquire
        let sem_batch = Arc::new(Semaphore::new(10));
        let _batch_permit = sem_batch.acquire(&cx, 4).await.unwrap();
        let available_batch = sem_batch.available_permits();

        // Sequential acquire
        let sem_sequential = Arc::new(Semaphore::new(10));
        let _permit1 = sem_sequential.acquire(&cx, 1).await.unwrap();
        let _permit2 = sem_sequential.acquire(&cx, 1).await.unwrap();
        let _permit3 = sem_sequential.acquire(&cx, 1).await.unwrap();
        let _permit4 = sem_sequential.acquire(&cx, 1).await.unwrap();
        let available_sequential = sem_sequential.available_permits();

        // MR: Batch and sequential should yield same available count
        assert_eq!(
            available_batch, available_sequential,
            "Batch vs sequential acquire should yield equivalent end state"
        );
    });
}

/// Property-based test for permit conservation under random operations
proptest! {
    #[test]
    fn prop_permit_conservation(
        initial_permits in 1usize..20,
        operations in prop::collection::vec(
            (1usize..5, prop::bool::ANY), // (count, should_cancel)
            0..10
        )
    ) {
        futures_lite::future::block_on(async {
            let conservation = PermitConservation::new(initial_permits);
            conservation.verify_conservation();

            let mut permits_held = Vec::new();

            for (idx, (count, should_cancel)) in operations.into_iter().enumerate() {
                if count > initial_permits { continue; } // Skip impossible requests
                if !should_cancel && count > conservation.semaphore.available_permits() {
                    continue;
                } // Avoid waiting forever on permits held by earlier operations.

                let cx = Cx::for_testing();
                if should_cancel {
                    cx.set_cancel_requested(true);
                }

                match conservation.semaphore.acquire(&cx, count).await {
                    Ok(permit) => {
                        conservation.live_permits.fetch_add(count, Ordering::Release);
                        permits_held.push((permit, count));
                    },
                    Err(_) => {
                        // Cancellation or semaphore closed - expected
                    }
                }

                conservation.verify_conservation();

                // Randomly release some permits
                if !permits_held.is_empty() && (idx + permits_held.len()) % 2 == 0 {
                    if let Some((permit, count)) = permits_held.pop() {
                        conservation.live_permits.fetch_sub(count, Ordering::Release);
                        drop(permit);
                        conservation.verify_conservation();
                    }
                }
            }

            // Clean up remaining permits
            for (permit, count) in permits_held {
                conservation.live_permits.fetch_sub(count, Ordering::Release);
                drop(permit);
                conservation.verify_conservation();
            }
        });
    }
}

/// Composite MR: Conservation + Cancellation + Scaling
#[test]
fn mr_composite_properties() {
    futures_lite::future::block_on(async {
        let base_permits = 6;
        let scale = 2;

        // Create scaled semaphore
        let sem = Arc::new(Semaphore::new(base_permits * scale));
        let cx = Cx::for_testing();

        // Acquire scaled amounts
        let permit1 = sem.acquire(&cx, 2 * scale).await.unwrap();
        let after_acquire = sem.available_permits();

        // Cancel an operation (should not affect state)
        let cx_cancel = Cx::for_testing();
        cx_cancel.set_cancel_requested(true);
        let _ = sem.acquire(&cx_cancel, scale).await; // Should fail

        let after_cancel = sem.available_permits();
        assert_eq!(
            after_acquire, after_cancel,
            "Cancelled operation should not change state"
        );

        // Release and verify conservation
        drop(permit1);
        let final_permits = sem.available_permits();
        assert_eq!(
            final_permits,
            base_permits * scale,
            "Final state should conserve all permits"
        );
    });
}