1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
//! Host <-> device transfer operations.
//!
//! Generic versions of `download_column` and `create_buffer_from_slice`,
//! replacing the 15 type-specialized functions with 3 generics.
use std::sync::atomic::Ordering;
use xlog_core::{Result, Schema, XlogError};
use crate::type_seam::GpuScalar;
use crate::CudaBuffer;
impl super::CudaKernelProvider {
/// Download a single column from GPU to host as `Vec<T>`.
///
/// Replaces: `download_column_u32`, `download_column_u64`,
/// `download_column_i32`, `download_column_i64`, `download_column_f32`,
/// `download_column_f64`, `download_column_u8`, `download_column_bool`.
///
/// Increments `d2h_transfer_count` (the per-call ILP-style counter)
/// and is checked by the strict deterministic-Datalog D2H guard when
/// it is enabled (see `enable_strict_deterministic_d2h`).
pub fn download_column<T: GpuScalar>(
&self,
buffer: &CudaBuffer,
col_idx: usize,
) -> Result<Vec<T>> {
// Gate first so a deterministic-strict run fails before any host
// allocation or counter mutation. Zero-row downloads are a no-op
// in `download_column_inner` and never issue a D2H transfer, so
// the gate must not fire for them. The row count we resolve here
// is threaded into the inner helper so it does not look it up
// again — the cache makes a second call cheap, but the explicit
// hand-off keeps the contract clear.
let num_rows = self.device_row_count(buffer)?;
if num_rows > 0 {
self.gate_column_download::<T>("download_column", num_rows)?;
self.d2h_transfer_count.fetch_add(1, Ordering::Relaxed);
}
self.download_column_inner_with_rows::<T>(buffer, col_idx, num_rows)
}
/// Download a column WITHOUT incrementing the per-call
/// `d2h_transfer_count` (the ILP-style counter). Records in
/// `transfer_tracker` for byte/call profiling stats.
///
/// IS still checked by the strict deterministic-Datalog D2H guard when
/// it is enabled — "untracked" only refers to `d2h_transfer_count`,
/// not to the deterministic gate. Use `dtoh_scalar_untracked` for
/// metadata reads that must remain allowed under the gate.
///
/// Replaces: `download_f64_untracked` (now generic over `T`).
pub fn download_column_untracked<T: GpuScalar>(
&self,
buffer: &CudaBuffer,
col_idx: usize,
) -> Result<Vec<T>> {
let col = buffer.column(col_idx).ok_or_else(|| {
XlogError::kernel_ctx("download_column_untracked", "column not found", &col_idx)
})?;
let num_rows = self.device_row_count(buffer)?;
if num_rows == 0 {
return Ok(vec![]);
}
// Gate first so a deterministic-strict run fails before any host
// allocation, mirroring `download_column`. The downstream
// `dtoh_sync_copy_into_tracked` would also gate, but we stop earlier
// and with a more specific op label.
self.gate_column_download::<T>("download_column_untracked", num_rows)?;
let num_bytes = num_rows.checked_mul(T::BYTE_WIDTH).ok_or_else(|| {
XlogError::kernel_ctx("download_column_untracked", "byte size overflow", &num_rows)
})?;
let col_view = self.column_bytes_view(col, num_bytes)?;
let mut bytes = vec![0u8; num_bytes];
self.dtoh_sync_copy_into_tracked(&col_view, &mut bytes)?;
Ok(bytes
.chunks_exact(T::BYTE_WIDTH)
.map(|c| T::from_le_bytes(c))
.collect())
}
/// Shared implementation for tracked column downloads, with the row
/// count threaded in by the caller so we do not look it up twice.
///
/// Uses the provider's tracked D2H chokepoint so transfer-budget traces
/// observe column downloads. The caller is responsible for the early
/// strict deterministic-D2H gate check and for incrementing
/// `d2h_transfer_count`; this helper assumes both have already happened.
fn download_column_inner_with_rows<T: GpuScalar>(
&self,
buffer: &CudaBuffer,
col_idx: usize,
num_rows: usize,
) -> Result<Vec<T>> {
let col = buffer.column(col_idx).ok_or_else(|| {
XlogError::kernel_ctx("download_column", "column not found", &col_idx)
})?;
if num_rows == 0 {
return Ok(vec![]);
}
let num_bytes = num_rows.checked_mul(T::BYTE_WIDTH).ok_or_else(|| {
XlogError::kernel_ctx("download_column", "byte size overflow", &num_rows)
})?;
let col_view = self.column_bytes_view(col, num_bytes)?;
let mut bytes = vec![0u8; num_bytes];
self.dtoh_sync_copy_into_tracked(&col_view, &mut bytes)
.map_err(|e| XlogError::kernel_ctx("download_column", "dtoh copy failed", &e))?;
Ok(bytes
.chunks_exact(T::BYTE_WIDTH)
.map(|c| T::from_le_bytes(c))
.collect())
}
/// Upload a typed slice as a single-column GPU buffer.
///
/// Replaces: `create_buffer_from_u32_slice`, `create_buffer_from_u64_slice`,
/// `create_buffer_from_i32_slice`, `create_buffer_from_i64_slice`,
/// `create_buffer_from_f32_slice`, `create_buffer_from_f64_slice`,
/// `create_buffer_from_u8_slice`.
pub fn create_buffer_from_slice<T: GpuScalar>(
&self,
data: &[T],
schema: Schema,
) -> Result<CudaBuffer> {
let num_bytes = data.len().checked_mul(T::BYTE_WIDTH).ok_or_else(|| {
XlogError::kernel_ctx(
"create_buffer_from_slice",
"byte size overflow",
&data.len(),
)
})?;
let mut bytes = vec![0u8; num_bytes];
for (i, val) in data.iter().enumerate() {
let offset = i * T::BYTE_WIDTH;
val.to_le_bytes_into(&mut bytes[offset..offset + T::BYTE_WIDTH]);
}
let mut col = self.memory.alloc::<u8>(bytes.len())?;
self.htod_sync_copy_into_tracked(&bytes, &mut col)
.map_err(|e| {
XlogError::kernel_ctx("create_buffer_from_slice", "htod copy failed", &e)
})?;
self.buffer_from_columns(vec![col.into()], data.len() as u64, schema)
}
/// Probe the deterministic-D2H gate for a column download of `num_rows`
/// rows of scalar `T`. Returns `Err` and increments the violation counter
/// when the gate is enabled. Used by `download_column` and
/// `download_column_untracked` so the gate fires before any host buffer
/// is allocated or counter mutated, mirroring the chokepoint in
/// `dtoh_sync_copy_into_tracked`.
fn gate_column_download<T: GpuScalar>(&self, op: &'static str, num_rows: usize) -> Result<()> {
let bytes = num_rows
.checked_mul(T::BYTE_WIDTH)
.ok_or_else(|| XlogError::kernel_ctx(op, "byte size overflow", &num_rows))?;
self.check_deterministic_d2h(op, bytes as u64)
}
}