use cudarc::driver::sys;
use xlog_core::{Result, Schema, XlogError};
use crate::cuda_compat::DeviceSlice;
use crate::device_runtime::StreamId;
use crate::launch::LaunchRecorder;
use crate::memory::{CudaColumn, TrackedCudaSlice};
use crate::CudaBuffer;
use super::CudaKernelProvider;
impl CudaKernelProvider {
pub fn wcoj_project_2col_swap_recorded(
&self,
src: &CudaBuffer,
launch_stream: StreamId,
) -> Result<CudaBuffer> {
let runtime = self.memory().runtime().ok_or_else(|| {
XlogError::Kernel(
"wcoj_project_2col_swap_recorded requires a runtime-backed \
GpuMemoryManager (constructed via with_runtime)"
.to_string(),
)
})?;
let cu_stream = runtime
.stream_pool()
.resolve(launch_stream)
.ok_or_else(|| {
XlogError::Kernel(format!(
"wcoj_project_2col_swap_recorded: launch_stream StreamId({}) does not resolve",
launch_stream.0
))
})?;
if src.arity() != 2 {
return Err(XlogError::Kernel(format!(
"wcoj_project_2col_swap_recorded: src must be 2-column, got arity {}",
src.arity()
)));
}
let swapped_schema = Schema::new(vec![
src.schema.columns[1].clone(),
src.schema.columns[0].clone(),
])
.with_sort_labels(vec![
src.schema
.column_sort_label(1)
.unwrap_or("col1")
.to_string(),
src.schema
.column_sort_label(0)
.unwrap_or("col0")
.to_string(),
])
.expect("swapped sort labels match schema arity");
if src.row_cap == 0 {
return self.create_empty_buffer(swapped_schema);
}
let bytes_col0 = src.column(0).expect("src.col0").len();
let bytes_col1 = src.column(1).expect("src.col1").len();
let new_col0: TrackedCudaSlice<u8> = self.memory.alloc::<u8>(bytes_col1)?;
let new_col1: TrackedCudaSlice<u8> = self.memory.alloc::<u8>(bytes_col0)?;
let new_num_rows: TrackedCudaSlice<u32> = self.memory.alloc::<u32>(1)?;
let mut rec = LaunchRecorder::new_strict(launch_stream);
rec.read_column(src.column(0).expect("src.col0"));
rec.read_column(src.column(1).expect("src.col1"));
rec.read(src.num_rows_device());
rec.write(&new_col0);
rec.write(&new_col1);
rec.write(&new_num_rows);
rec.preflight(runtime).map_err(|e| {
XlogError::Kernel(format!(
"wcoj_project_2col_swap_recorded: launch recorder preflight failed: {}",
e
))
})?;
let queued_result: Result<()> = (|| {
unsafe {
let res = sys::cuMemcpyDtoDAsync_v2(
*new_col0.device_ptr(),
*src.column(1).expect("src.col1").device_ptr(),
bytes_col1,
cu_stream.cu_stream(),
);
if res != sys::cudaError_enum::CUDA_SUCCESS {
return Err(XlogError::Kernel(format!(
"wcoj_project_2col_swap_recorded: dtod col1 → new_col0 failed: {:?}",
res
)));
}
let res = sys::cuMemcpyDtoDAsync_v2(
*new_col1.device_ptr(),
*src.column(0).expect("src.col0").device_ptr(),
bytes_col0,
cu_stream.cu_stream(),
);
if res != sys::cudaError_enum::CUDA_SUCCESS {
return Err(XlogError::Kernel(format!(
"wcoj_project_2col_swap_recorded: dtod col0 → new_col1 failed: {:?}",
res
)));
}
let res = sys::cuMemcpyDtoDAsync_v2(
*new_num_rows.device_ptr(),
*src.num_rows_device().device_ptr(),
std::mem::size_of::<u32>(),
cu_stream.cu_stream(),
);
if res != sys::cudaError_enum::CUDA_SUCCESS {
return Err(XlogError::Kernel(format!(
"wcoj_project_2col_swap_recorded: dtod num_rows_device failed: {:?}",
res
)));
}
}
Ok(())
})();
if let Err(e) = queued_result {
let _ = cu_stream.synchronize();
return Err(e);
}
rec.commit(runtime).map_err(|e| {
let _ = cu_stream.synchronize();
XlogError::Kernel(format!(
"wcoj_project_2col_swap_recorded: launch recorder commit failed: {}",
e
))
})?;
let columns: Vec<CudaColumn> = vec![new_col0.into(), new_col1.into()];
let buf = match src.cached_row_count() {
Some(host_count) => CudaBuffer::from_columns_with_host_count(
columns,
src.row_cap,
new_num_rows,
swapped_schema,
host_count,
),
None => CudaBuffer::from_columns(columns, src.row_cap, new_num_rows, swapped_schema),
};
Ok(buf)
}
pub fn wcoj_project_output_columns_recorded(
&self,
src: &CudaBuffer,
perm: &[usize],
head_schema: Schema,
launch_stream: StreamId,
) -> Result<CudaBuffer> {
let runtime = self.memory().runtime().ok_or_else(|| {
XlogError::Kernel(
"wcoj_project_output_columns_recorded requires a runtime-backed \
GpuMemoryManager (constructed via with_runtime)"
.to_string(),
)
})?;
let cu_stream = runtime
.stream_pool()
.resolve(launch_stream)
.ok_or_else(|| {
XlogError::Kernel(format!(
"wcoj_project_output_columns_recorded: launch_stream StreamId({}) does not resolve",
launch_stream.0
))
})?;
if perm.len() != head_schema.arity() {
return Err(XlogError::Kernel(format!(
"wcoj_project_output_columns_recorded: perm len {} must equal head_schema arity {}",
perm.len(),
head_schema.arity()
)));
}
for (i, &p) in perm.iter().enumerate() {
if p >= src.arity() {
return Err(XlogError::Kernel(format!(
"wcoj_project_output_columns_recorded: perm[{}] = {} out of bounds (src arity {})",
i,
p,
src.arity()
)));
}
}
if src.row_cap == 0 {
return self.create_empty_buffer(head_schema);
}
let mut new_columns: Vec<TrackedCudaSlice<u8>> = Vec::with_capacity(perm.len());
for &p in perm {
let bytes = src.column(p).expect("src column").len();
new_columns.push(self.memory.alloc::<u8>(bytes)?);
}
let new_num_rows: TrackedCudaSlice<u32> = self.memory.alloc::<u32>(1)?;
let mut rec = LaunchRecorder::new_strict(launch_stream);
for i in 0..src.arity() {
rec.read_column(src.column(i).expect("src.col"));
}
rec.read(src.num_rows_device());
for c in &new_columns {
rec.write(c);
}
rec.write(&new_num_rows);
rec.preflight(runtime).map_err(|e| {
XlogError::Kernel(format!(
"wcoj_project_output_columns_recorded: launch recorder preflight failed: {}",
e
))
})?;
let queued_result: Result<()> = (|| {
for (i, &p) in perm.iter().enumerate() {
let src_col = src.column(p).expect("src column");
let bytes = src_col.len();
unsafe {
let res = sys::cuMemcpyDtoDAsync_v2(
*new_columns[i].device_ptr(),
*src_col.device_ptr(),
bytes,
cu_stream.cu_stream(),
);
if res != sys::cudaError_enum::CUDA_SUCCESS {
return Err(XlogError::Kernel(format!(
"wcoj_project_output_columns_recorded: dtod perm[{}] = src col {} failed: {:?}",
i, p, res
)));
}
}
}
unsafe {
let res = sys::cuMemcpyDtoDAsync_v2(
*new_num_rows.device_ptr(),
*src.num_rows_device().device_ptr(),
std::mem::size_of::<u32>(),
cu_stream.cu_stream(),
);
if res != sys::cudaError_enum::CUDA_SUCCESS {
return Err(XlogError::Kernel(format!(
"wcoj_project_output_columns_recorded: dtod num_rows_device failed: {:?}",
res
)));
}
}
Ok(())
})();
if let Err(e) = queued_result {
let _ = cu_stream.synchronize();
return Err(e);
}
rec.commit(runtime).map_err(|e| {
let _ = cu_stream.synchronize();
XlogError::Kernel(format!(
"wcoj_project_output_columns_recorded: launch recorder commit failed: {}",
e
))
})?;
let columns: Vec<CudaColumn> = new_columns.into_iter().map(|c| c.into()).collect();
let buf = match src.cached_row_count() {
Some(host_count) => CudaBuffer::from_columns_with_host_count(
columns,
src.row_cap,
new_num_rows,
head_schema,
host_count,
),
None => CudaBuffer::from_columns(columns, src.row_cap, new_num_rows, head_schema),
};
Ok(buf)
}
}