1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// crates/xlog-cuda/tests/test_setup_provider_with_runtime.rs
//! Higher-level construction-path test for the v0.6 opt-in fixture.
//!
//! `tests/common/mod.rs::setup_provider` is the canonical higher-
//! level construction path used across xlog-cuda's integration
//! tests: it builds a `CudaKernelProvider` for "I need a working
//! provider" callers without exposing the device, memory manager,
//! or kernel-loading details. The opt-in v0.6 sibling
//! `setup_provider_with_runtime` produces the same provider shape
//! but composes the runtime stack
//! `GlobalDeviceBudget(LoggingResource(AsyncCudaResource))` and
//! constructs the provider via the `with_runtime` constructor.
//!
//! This test exercises the new fixture end-to-end: build the
//! provider via the runtime fixture, run a real provider operation
//! (`create_buffer_from_slice`), then assert the runtime stack
//! observed the allocation / deallocation through the same
//! manager `Arc` that the provider holds. Drop + reap returns the
//! runtime to baseline.
//!
//! Out of scope: A3 parallel stress, join prototype rebase, any
//! migration of external `setup_provider` call sites. The legacy
//! fixture remains the default for tests that do not need to
//! observe runtime routing.
mod common;
use common::setup_provider_with_runtime;
use xlog_core::{ScalarType, Schema};
use xlog_cuda::device_runtime::{LogAction, LogResult};
#[test]
fn setup_provider_with_runtime_routes_real_provider_alloc() {
let Some(handles) = setup_provider_with_runtime() else {
return;
};
// Sanity: the manager the fixture returned is the same Arc the
// provider holds, and it has a runtime attached.
assert!(handles.memory.runtime().is_some());
assert!(std::sync::Arc::ptr_eq(
handles.provider.memory(),
&handles.memory
));
let baseline_runtime = handles.runtime.bytes_outstanding();
let baseline_local = handles.memory.allocated_bytes();
let baseline_records = handles.sink.len();
// Drive a real provider operation that internally calls
// memory.alloc::<u8>(..) (and a small alloc::<u32>(1) for the
// device row count). The exact allocation count is an
// implementation detail; the routing invariant is what we pin.
let col0: Vec<u32> = vec![10, 20, 30, 40, 50, 60, 70, 80];
let column_bytes = col0.len() * std::mem::size_of::<u32>();
let schema = Schema::new(vec![("col0".to_string(), ScalarType::U32)]);
let buffer = handles
.provider
.create_buffer_from_slice::<u32>(&col0, schema)
.expect("create_buffer_from_slice succeeds under the fixture's generous budget");
// Local + runtime counters mirror the same set of allocations.
let post_local = handles.memory.allocated_bytes();
let post_runtime = handles.runtime.bytes_outstanding();
assert!(post_local > baseline_local);
assert_eq!(
post_runtime - baseline_runtime,
(post_local - baseline_local) as usize,
"runtime delta must match manager delta when routing through with_runtime"
);
assert!(
post_local - baseline_local >= column_bytes as u64,
"post-call total ({}) must include at least the column ({} bytes)",
post_local - baseline_local,
column_bytes,
);
// Sink saw new Allocate records, all Ok, and at least one for
// the column byte size.
let recs = handles.sink.snapshot();
let new_records: Vec<_> = recs.iter().skip(baseline_records).collect();
assert!(!new_records.is_empty());
assert!(new_records.iter().all(|r| r.result == LogResult::Ok));
assert!(
new_records
.iter()
.any(|r| r.action == LogAction::Allocate && r.bytes == Some(column_bytes)),
"expected an Allocate record for {} bytes, got {:?}",
column_bytes,
new_records
);
// Drop the buffer: TrackedCudaSlice<u8>/<u32> with
// Backing::Runtime queues async frees through the runtime; the
// manager counter releases immediately, the runtime holds
// pending until reap.
drop(buffer);
assert_eq!(handles.memory.allocated_bytes(), baseline_local);
assert_eq!(
handles.runtime.bytes_outstanding(),
post_runtime,
"async backend: runtime holds bytes pending until reap"
);
handles.runtime.reap_pending().expect("reap");
assert_eq!(handles.runtime.bytes_outstanding(), baseline_runtime);
let recs_final = handles.sink.snapshot();
assert!(
recs_final
.iter()
.any(|r| r.action == LogAction::Deallocate && r.bytes == Some(column_bytes)),
"expected a Deallocate record for {} bytes after drop + reap",
column_bytes
);
}