1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// crates/xlog-cuda/tests/test_runtime_a2_via_runtime.rs
//! A2 variant exercised through the [`XlogDeviceRuntime`] facade
//! rather than against an `AsyncCudaResource` directly.
//!
//! The original `test_runtime_a2_stream_lifetime.rs` constructs an
//! `AsyncCudaResource` and calls its trait methods directly. That
//! proves the resource itself honors the stream-ordered contract,
//! but it bypasses the runtime — so it would not catch a regression
//! introduced in `XlogDeviceRuntime::allocate`/`deallocate`/
//! `reap_pending` that mishandled stream-ordering across the
//! mutex-protected `Box<dyn DeviceMemoryResource>` boundary (e.g.,
//! holding the resource lock too long, dropping it across a sync,
//! or losing the pending-bytes invariant).
//!
//! This test composes a runtime via
//! [`XlogDeviceRuntime::with_resource`] using `AsyncCudaResource`
//! as the active backend, then re-runs the A2 contract through the
//! runtime's public API. It is **not** the singleton — that path
//! still uses the cudarc default (non-pooled) backend.
//!
//! Same shape as A2:
//!
//! 1. `runtime.allocate` block A on a non-default stream.
//! 2. Async HtoD pattern_a (no host sync).
//! 3. `runtime.deallocate(A)` — queues cuMemFreeAsync. Bytes
//! remain counted in `runtime.bytes_outstanding()` until
//! `reap_pending`.
//! 4. `runtime.allocate` block B on the same stream — may reuse
//! A's address.
//! 5. Async HtoD pattern_b.
//! 6. `cuStreamSynchronize`.
//! 7. Read back B; must equal pattern_b.
//! 8. `runtime.deallocate(B)`, `runtime.reap_pending()`.
//! 9. `runtime.bytes_outstanding() == 0`.
//!
//! Skips when CUDA is unavailable or the pool can't fork a
//! non-default stream.
use std::sync::Arc;
use cudarc::driver::sys;
use xlog_cuda::device_runtime::{
AllocTag, AsyncCudaResource, StreamId, StreamPool, XlogDeviceRuntime,
};
use xlog_cuda::CudaDevice;
const BYTES: usize = 4096;
unsafe fn htod_async(stream: sys::CUstream, dst: u64, src: &[u8]) {
let res = sys::cuMemcpyHtoDAsync_v2(dst, src.as_ptr() as *const _, src.len(), stream);
assert_eq!(
res,
sys::cudaError_enum::CUDA_SUCCESS,
"cuMemcpyHtoDAsync_v2 returned {:?}",
res
);
}
unsafe fn dtoh_sync(dst: &mut [u8], src: u64) {
let res = sys::cuMemcpyDtoH_v2(dst.as_mut_ptr() as *mut _, src, dst.len());
assert_eq!(
res,
sys::cudaError_enum::CUDA_SUCCESS,
"cuMemcpyDtoH_v2 returned {:?}",
res
);
}
#[test]
fn a2_stream_ordered_alloc_free_reuse_through_runtime_facade() {
let Some(device) = CudaDevice::new(0).ok().map(Arc::new) else {
eprintln!("Skipping A2-via-runtime: CUDA runtime unavailable");
return;
};
let pool = Arc::new(StreamPool::with_defaults(Arc::clone(&device)));
let resource = Box::new(AsyncCudaResource::new(
Arc::clone(&device),
0,
Arc::clone(&pool),
));
let runtime =
XlogDeviceRuntime::with_resource(Arc::clone(&device), 0, Arc::clone(&pool), resource);
let stream_id = match pool.acquire() {
Ok(id) => id,
Err(e) => {
eprintln!("Skipping A2-via-runtime: StreamPool::acquire failed: {}", e);
return;
}
};
assert_ne!(stream_id, StreamId::DEFAULT);
let stream = pool
.resolve(stream_id)
.expect("acquired StreamId must resolve");
let cu_stream = stream.cu_stream();
// Phase 1: allocate A through the runtime.
let block_a = runtime
.allocate(BYTES, stream_id, AllocTag("a2-runtime-A"))
.expect("runtime.allocate A");
assert_eq!(block_a.alloc_stream, stream_id);
assert_eq!(block_a.bytes, BYTES);
assert_eq!(runtime.bytes_outstanding(), BYTES);
// Phase 2: queue async write of pattern_a. No host sync.
let pattern_a = vec![0xCDu8; BYTES];
unsafe {
htod_async(cu_stream, block_a.ptr, &pattern_a);
}
// Phase 3: deallocate A through the runtime. The queued
// cuMemFreeAsync must remain counted as pending until reap.
runtime.deallocate(block_a).expect("runtime.deallocate A");
assert_eq!(
runtime.bytes_outstanding(),
BYTES,
"runtime.bytes_outstanding must report pending free bytes after async deallocate"
);
// Phase 4: allocate B on the same stream through the runtime.
let block_b = runtime
.allocate(BYTES, stream_id, AllocTag("a2-runtime-B"))
.expect("runtime.allocate B");
assert_eq!(block_b.alloc_stream, stream_id);
// Phase 5: queue async write of pattern_b.
let pattern_b = vec![0xEFu8; BYTES];
unsafe {
htod_async(cu_stream, block_b.ptr, &pattern_b);
}
// Phase 6: synchronize the stream once.
stream.synchronize().expect("stream sync");
// Phase 7: read back B. If the runtime facade or the underlying
// resource broke stream ordering, the readback would mix
// pattern_a bytes into B.
let mut readback = vec![0u8; BYTES];
unsafe {
dtoh_sync(&mut readback, block_b.ptr);
}
assert_eq!(
readback, pattern_b,
"stream-ordered reuse violated through runtime facade: \
B contains stale bytes from A's queued write"
);
// Phase 8: deallocate B + final reap.
runtime.deallocate(block_b).expect("runtime.deallocate B");
runtime.reap_pending().expect("runtime.reap_pending");
// Phase 9: counters return to zero.
assert_eq!(runtime.bytes_outstanding(), 0);
}