use std::ffi::c_void;
use std::time::Instant;
use cudarc::driver::sys;
use xlog_core::{Result, ScalarType, Schema, XlogError};
use super::{wcoj_kernels, CudaKernelProvider, WCOJ_MODULE};
use crate::device_runtime::StreamId;
use crate::launch::LaunchRecorder;
use crate::memory::{CudaColumn, TrackedCudaSlice};
use crate::wcoj_metadata::WcojRelationMetadata;
use crate::CudaBuffer;
use crate::{AsKernelParam, LaunchAsync, LaunchConfig};
const BLOCK_SIZE: u32 = 256;
fn column_u32(input: &CudaBuffer, col_idx: usize) -> Result<&TrackedCudaSlice<u32>> {
let col = input.column(col_idx).ok_or_else(|| {
XlogError::Kernel(format!(
"wcoj_layout_u32_recorded: column {col_idx} not found"
))
})?;
match col {
CudaColumn::Owned(slice) => unsafe {
Ok(&*(slice as *const TrackedCudaSlice<u8> as *const TrackedCudaSlice<u32>))
},
_ => Err(XlogError::Kernel(
"wcoj_layout_u32_recorded: input column must be owned".to_string(),
)),
}
}
fn column_u64(input: &CudaBuffer, col_idx: usize) -> Result<&TrackedCudaSlice<u64>> {
let col = input.column(col_idx).ok_or_else(|| {
XlogError::Kernel(format!(
"wcoj_layout_u64_recorded: column {col_idx} not found"
))
})?;
match col {
CudaColumn::Owned(slice) => unsafe {
Ok(&*(slice as *const TrackedCudaSlice<u8> as *const TrackedCudaSlice<u64>))
},
_ => Err(XlogError::Kernel(
"wcoj_layout_u64_recorded: input column must be owned".to_string(),
)),
}
}
impl CudaKernelProvider {
pub fn wcoj_layout_u32_recorded(
&self,
input: &CudaBuffer,
launch_stream: StreamId,
) -> Result<CudaBuffer> {
if self.memory().runtime().is_none() {
return Err(XlogError::Kernel(
"wcoj_layout_u32_recorded requires a runtime-backed \
GpuMemoryManager (constructed via with_runtime)"
.to_string(),
));
}
if input.arity() != 2 {
return Err(XlogError::Kernel(format!(
"wcoj_layout_u32_recorded: input must be 2-column, got arity {}",
input.arity()
)));
}
for col_idx in 0..2 {
let ty = input.schema.column_type(col_idx).ok_or_else(|| {
XlogError::Kernel(format!(
"wcoj_layout_u32_recorded: column {} type missing",
col_idx
))
})?;
if !matches!(ty, ScalarType::U32 | ScalarType::Symbol) {
return Err(XlogError::Kernel(format!(
"wcoj_layout_u32_recorded: column {} must be U32 or Symbol, got {:?}",
col_idx, ty
)));
}
}
match self.try_wcoj_layout_fast_path_u32(input, launch_stream) {
Ok(Some(out)) => {
self.record_wcoj_layout_fast_path_hit();
return Ok(out);
}
Ok(None) => {
}
Err(_) => {
}
}
self.dedup_full_row_recorded(input, launch_stream)
}
pub fn wcoj_layout_sort_u32_recorded(
&self,
input: &CudaBuffer,
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.record_wcoj_layout_sort_invocation();
if self.memory().runtime().is_none() {
return Err(XlogError::Kernel(
"wcoj_layout_sort_u32_recorded requires a runtime-backed \
GpuMemoryManager (constructed via with_runtime)"
.to_string(),
));
}
if input.arity() < 2 {
return Err(XlogError::Kernel(format!(
"wcoj_layout_sort_u32_recorded: input must have arity >= 2, got {}",
input.arity()
)));
}
for col_idx in 0..input.arity() {
let ty = input.schema.column_type(col_idx).ok_or_else(|| {
XlogError::Kernel(format!(
"wcoj_layout_sort_u32_recorded: column {} type missing",
col_idx
))
})?;
if !matches!(ty, ScalarType::U32 | ScalarType::Symbol) {
return Err(XlogError::Kernel(format!(
"wcoj_layout_sort_u32_recorded: column {} must be U32 or Symbol \
(4-byte width-class), got {:?}",
col_idx, ty
)));
}
}
self.dedup_full_row_recorded(input, launch_stream)
}
pub fn wcoj_triangle_u32_recorded(
&self,
e_xy: &CudaBuffer,
e_yz: &CudaBuffer,
e_xz: &CudaBuffer,
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_triangle_hg_u32_recorded(
e_xy,
e_yz,
e_xz,
crate::wcoj_metadata::WCOJ_HG_BLOCK_WORK_UNIT_DEFAULT,
launch_stream,
)
}
pub fn wcoj_4cycle_u32_recorded(
&self,
e1: &CudaBuffer,
e2: &CudaBuffer,
e3: &CudaBuffer,
e4: &CudaBuffer,
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_4cycle_hg_u32_recorded(
e1,
e2,
e3,
e4,
crate::wcoj_metadata::WCOJ_HG_BLOCK_WORK_UNIT_DEFAULT,
launch_stream,
)
}
fn logical_row_count_u32(&self, buf: &CudaBuffer) -> Result<u32> {
if let Some(c) = buf.cached_row_count() {
return Ok(c);
}
self.dtoh_scalar_untracked::<u32>(buf.num_rows_device(), 0)
}
pub fn wcoj_layout_u64_recorded(
&self,
input: &CudaBuffer,
launch_stream: StreamId,
) -> Result<CudaBuffer> {
if self.memory().runtime().is_none() {
return Err(XlogError::Kernel(
"wcoj_layout_u64_recorded requires a runtime-backed \
GpuMemoryManager (constructed via with_runtime)"
.to_string(),
));
}
if input.arity() != 2 {
return Err(XlogError::Kernel(format!(
"wcoj_layout_u64_recorded: input must be 2-column, got arity {}",
input.arity()
)));
}
for col_idx in 0..2 {
let ty = input.schema.column_type(col_idx).ok_or_else(|| {
XlogError::Kernel(format!(
"wcoj_layout_u64_recorded: column {} type missing",
col_idx
))
})?;
if !matches!(ty, ScalarType::U64) {
return Err(XlogError::Kernel(format!(
"wcoj_layout_u64_recorded: column {} must be U64, got {:?}",
col_idx, ty
)));
}
}
if let Ok(Some(out)) = self.try_wcoj_layout_fast_path_u64(input, launch_stream) {
self.record_wcoj_layout_fast_path_hit();
return Ok(out);
}
self.dedup_full_row_recorded(input, launch_stream)
}
pub fn wcoj_layout_sort_u64_recorded(
&self,
input: &CudaBuffer,
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.record_wcoj_layout_sort_invocation();
if self.memory().runtime().is_none() {
return Err(XlogError::Kernel(
"wcoj_layout_sort_u64_recorded requires a runtime-backed \
GpuMemoryManager (constructed via with_runtime)"
.to_string(),
));
}
if input.arity() < 2 {
return Err(XlogError::Kernel(format!(
"wcoj_layout_sort_u64_recorded: input must have arity >= 2, got {}",
input.arity()
)));
}
for col_idx in 0..input.arity() {
let ty = input.schema.column_type(col_idx).ok_or_else(|| {
XlogError::Kernel(format!(
"wcoj_layout_sort_u64_recorded: column {} type missing",
col_idx
))
})?;
if !matches!(ty, ScalarType::U64) {
return Err(XlogError::Kernel(format!(
"wcoj_layout_sort_u64_recorded: column {} must be U64 \
(8-byte width-class), got {:?}",
col_idx, ty
)));
}
}
self.dedup_full_row_recorded(input, launch_stream)
}
pub fn wcoj_triangle_u64_recorded(
&self,
e_xy: &CudaBuffer,
e_yz: &CudaBuffer,
e_xz: &CudaBuffer,
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_triangle_hg_u64_recorded(
e_xy,
e_yz,
e_xz,
crate::wcoj_metadata::WCOJ_HG_BLOCK_WORK_UNIT_DEFAULT,
launch_stream,
)
}
pub fn wcoj_4cycle_u64_recorded(
&self,
e1: &CudaBuffer,
e2: &CudaBuffer,
e3: &CudaBuffer,
e4: &CudaBuffer,
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_4cycle_hg_u64_recorded(
e1,
e2,
e3,
e4,
crate::wcoj_metadata::WCOJ_HG_BLOCK_WORK_UNIT_DEFAULT,
launch_stream,
)
}
}
impl CudaKernelProvider {
fn try_wcoj_layout_fast_path_u32(
&self,
input: &CudaBuffer,
launch_stream: StreamId,
) -> Result<Option<CudaBuffer>> {
let runtime = self
.memory()
.runtime()
.ok_or_else(|| XlogError::Kernel("wcoj_layout fast-path: no runtime".to_string()))?;
let cu_stream = runtime
.stream_pool()
.resolve(launch_stream)
.ok_or_else(|| {
XlogError::Kernel("wcoj_layout fast-path: stream resolve".to_string())
})?;
let n = self.logical_row_count_u32(input)?;
if n == 0 {
return Ok(None);
}
if n == 1 {
return Ok(Some(self.recorded_clone_2col_4byte(
input,
n,
launch_stream,
&cu_stream,
runtime,
)?));
}
let mut flag_buf = self.memory.alloc::<u32>(1)?;
let col0 = column_u32(input, 0)?;
let col1 = column_u32(input, 1)?;
let device = self.device.inner();
let kernel = device
.get_func(
WCOJ_MODULE,
wcoj_kernels::WCOJ_LAYOUT_CHECK_SORTED_UNIQUE_U32,
)
.ok_or_else(|| {
XlogError::Kernel("wcoj_layout_check_sorted_unique_u32 kernel not found".into())
})?;
let mut rec = LaunchRecorder::new_strict(launch_stream);
rec.read(input.num_rows_device());
rec.read_column(input.column(0).expect("col0"));
rec.read_column(input.column(1).expect("col1"));
rec.write(&flag_buf);
rec.preflight(runtime)
.map_err(|e| XlogError::Kernel(format!("wcoj_layout fast-path: preflight {e}")))?;
let one: u32 = 1;
let grid = n.div_ceil(BLOCK_SIZE);
let queued_result: Result<()> = (|| {
self.htod_launch_metadata_async_copy_one(
&one,
&flag_buf,
&cu_stream,
"wcoj_layout fast-path flag init",
)?;
unsafe {
kernel
.clone()
.launch_on_stream(
&cu_stream,
LaunchConfig {
grid_dim: (grid, 1, 1),
block_dim: (BLOCK_SIZE, 1, 1),
shared_mem_bytes: 0,
},
(col0, col1, n, &mut flag_buf),
)
.map_err(|e| {
XlogError::Kernel(format!(
"wcoj_layout_check_sorted_unique_u32 launch: {e}"
))
})?;
}
Ok(())
})();
if let Err(e) = queued_result {
let _ = cu_stream.synchronize();
return Err(e);
}
if let Err(e) = rec.commit(runtime) {
let _ = cu_stream.synchronize();
return Err(XlogError::Kernel(format!(
"wcoj_layout fast-path: commit {e}"
)));
}
cu_stream
.synchronize()
.map_err(|e| XlogError::Kernel(format!("wcoj_layout fast-path: sync {e}")))?;
let flag_val = self.dtoh_scalar_untracked::<u32>(&flag_buf, 0)?;
if flag_val == 1 {
Ok(Some(self.recorded_clone_2col_4byte(
input,
n,
launch_stream,
&cu_stream,
runtime,
)?))
} else {
Ok(None)
}
}
fn try_wcoj_layout_fast_path_u64(
&self,
input: &CudaBuffer,
launch_stream: StreamId,
) -> Result<Option<CudaBuffer>> {
let runtime = self
.memory()
.runtime()
.ok_or_else(|| XlogError::Kernel("wcoj_layout fast-path: no runtime".to_string()))?;
let cu_stream = runtime
.stream_pool()
.resolve(launch_stream)
.ok_or_else(|| {
XlogError::Kernel("wcoj_layout fast-path: stream resolve".to_string())
})?;
let n = self.logical_row_count_u32(input)?;
if n == 0 {
return Ok(None);
}
if n == 1 {
return Ok(Some(self.recorded_clone_2col_8byte(
input,
n,
launch_stream,
&cu_stream,
runtime,
)?));
}
let mut flag_buf = self.memory.alloc::<u32>(1)?;
let col0 = column_u64(input, 0)?;
let col1 = column_u64(input, 1)?;
let device = self.device.inner();
let kernel = device
.get_func(
WCOJ_MODULE,
wcoj_kernels::WCOJ_LAYOUT_CHECK_SORTED_UNIQUE_U64,
)
.ok_or_else(|| {
XlogError::Kernel("wcoj_layout_check_sorted_unique_u64 kernel not found".into())
})?;
let mut rec = LaunchRecorder::new_strict(launch_stream);
rec.read(input.num_rows_device());
rec.read_column(input.column(0).expect("col0"));
rec.read_column(input.column(1).expect("col1"));
rec.write(&flag_buf);
rec.preflight(runtime)
.map_err(|e| XlogError::Kernel(format!("wcoj_layout fast-path u64: preflight {e}")))?;
let one: u32 = 1;
let grid = n.div_ceil(BLOCK_SIZE);
let queued_result: Result<()> = (|| {
self.htod_launch_metadata_async_copy_one(
&one,
&flag_buf,
&cu_stream,
"wcoj_layout fast-path u64 flag init",
)?;
unsafe {
kernel
.clone()
.launch_on_stream(
&cu_stream,
LaunchConfig {
grid_dim: (grid, 1, 1),
block_dim: (BLOCK_SIZE, 1, 1),
shared_mem_bytes: 0,
},
(col0, col1, n, &mut flag_buf),
)
.map_err(|e| {
XlogError::Kernel(format!(
"wcoj_layout_check_sorted_unique_u64 launch: {e}"
))
})?;
}
Ok(())
})();
if let Err(e) = queued_result {
let _ = cu_stream.synchronize();
return Err(e);
}
if let Err(e) = rec.commit(runtime) {
let _ = cu_stream.synchronize();
return Err(XlogError::Kernel(format!(
"wcoj_layout fast-path u64: commit {e}"
)));
}
cu_stream
.synchronize()
.map_err(|e| XlogError::Kernel(format!("wcoj_layout fast-path u64: sync {e}")))?;
let flag_val = self.dtoh_scalar_untracked::<u32>(&flag_buf, 0)?;
if flag_val == 1 {
Ok(Some(self.recorded_clone_2col_8byte(
input,
n,
launch_stream,
&cu_stream,
runtime,
)?))
} else {
Ok(None)
}
}
fn recorded_clone_2col_4byte(
&self,
input: &CudaBuffer,
n: u32,
launch_stream: StreamId,
cu_stream: &cudarc::driver::CudaStream,
runtime: &std::sync::Arc<crate::device_runtime::XlogDeviceRuntime>,
) -> Result<CudaBuffer> {
let bpc = (n as usize) * 4;
let out_col0 = self.memory.alloc::<u8>(bpc)?;
let out_col1 = self.memory.alloc::<u8>(bpc)?;
let out_d_num_rows = self.memory.alloc::<u32>(1)?;
let src_col0 = input.column(0).expect("col0");
let src_col1 = input.column(1).expect("col1");
let mut rec = LaunchRecorder::new_strict(launch_stream);
rec.read(input.num_rows_device());
rec.read_column(src_col0);
rec.read_column(src_col1);
rec.write(&out_col0);
rec.write(&out_col1);
rec.write(&out_d_num_rows);
rec.preflight(runtime)
.map_err(|e| XlogError::Kernel(format!("wcoj_layout clone 4B: preflight {e}")))?;
let queued_result: Result<()> = (|| {
unsafe {
let r0 = sys::cuMemcpyDtoDAsync_v2(
*out_col0.device_ptr(),
*src_col0.device_ptr(),
bpc,
cu_stream.cu_stream(),
);
if r0 != sys::cudaError_enum::CUDA_SUCCESS {
return Err(XlogError::Kernel(format!(
"wcoj_layout clone 4B: dtod col0 failed: {r0:?}"
)));
}
let r1 = sys::cuMemcpyDtoDAsync_v2(
*out_col1.device_ptr(),
*src_col1.device_ptr(),
bpc,
cu_stream.cu_stream(),
);
if r1 != sys::cudaError_enum::CUDA_SUCCESS {
return Err(XlogError::Kernel(format!(
"wcoj_layout clone 4B: dtod col1 failed: {r1:?}"
)));
}
}
self.htod_launch_metadata_async_copy_one(
&n,
&out_d_num_rows,
cu_stream,
"wcoj_layout clone 4B d_num_rows",
)?;
Ok(())
})();
if let Err(e) = queued_result {
let _ = cu_stream.synchronize();
return Err(e);
}
if let Err(e) = rec.commit(runtime) {
let _ = cu_stream.synchronize();
return Err(XlogError::Kernel(format!(
"wcoj_layout clone 4B: commit {e}"
)));
}
Ok(CudaBuffer::from_columns_with_host_count(
vec![out_col0.into(), out_col1.into()],
n as u64,
out_d_num_rows,
input.schema().clone(),
n,
))
}
fn recorded_clone_2col_8byte(
&self,
input: &CudaBuffer,
n: u32,
launch_stream: StreamId,
cu_stream: &cudarc::driver::CudaStream,
runtime: &std::sync::Arc<crate::device_runtime::XlogDeviceRuntime>,
) -> Result<CudaBuffer> {
let bpc = (n as usize) * 8;
let out_col0 = self.memory.alloc::<u8>(bpc)?;
let out_col1 = self.memory.alloc::<u8>(bpc)?;
let out_d_num_rows = self.memory.alloc::<u32>(1)?;
let src_col0 = input.column(0).expect("col0");
let src_col1 = input.column(1).expect("col1");
let mut rec = LaunchRecorder::new_strict(launch_stream);
rec.read(input.num_rows_device());
rec.read_column(src_col0);
rec.read_column(src_col1);
rec.write(&out_col0);
rec.write(&out_col1);
rec.write(&out_d_num_rows);
rec.preflight(runtime)
.map_err(|e| XlogError::Kernel(format!("wcoj_layout clone 8B: preflight {e}")))?;
let queued_result: Result<()> = (|| {
unsafe {
let r0 = sys::cuMemcpyDtoDAsync_v2(
*out_col0.device_ptr(),
*src_col0.device_ptr(),
bpc,
cu_stream.cu_stream(),
);
if r0 != sys::cudaError_enum::CUDA_SUCCESS {
return Err(XlogError::Kernel(format!(
"wcoj_layout clone 8B: dtod col0 failed: {r0:?}"
)));
}
let r1 = sys::cuMemcpyDtoDAsync_v2(
*out_col1.device_ptr(),
*src_col1.device_ptr(),
bpc,
cu_stream.cu_stream(),
);
if r1 != sys::cudaError_enum::CUDA_SUCCESS {
return Err(XlogError::Kernel(format!(
"wcoj_layout clone 8B: dtod col1 failed: {r1:?}"
)));
}
}
self.htod_launch_metadata_async_copy_one(
&n,
&out_d_num_rows,
cu_stream,
"wcoj_layout clone 8B d_num_rows",
)?;
Ok(())
})();
if let Err(e) = queued_result {
let _ = cu_stream.synchronize();
return Err(e);
}
if let Err(e) = rec.commit(runtime) {
let _ = cu_stream.synchronize();
return Err(XlogError::Kernel(format!(
"wcoj_layout clone 8B: commit {e}"
)));
}
Ok(CudaBuffer::from_columns_with_host_count(
vec![out_col0.into(), out_col1.into()],
n as u64,
out_d_num_rows,
input.schema().clone(),
n,
))
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum CliqueWidthClass {
FourByte, EightByte, }
impl CliqueWidthClass {
fn elem_bytes(self) -> usize {
match self {
CliqueWidthClass::FourByte => 4,
CliqueWidthClass::EightByte => 8,
}
}
fn label(self) -> &'static str {
match self {
CliqueWidthClass::FourByte => "u32",
CliqueWidthClass::EightByte => "u64",
}
}
fn validate_col_type(self, ty: ScalarType) -> bool {
match self {
CliqueWidthClass::FourByte => matches!(ty, ScalarType::U32 | ScalarType::Symbol),
CliqueWidthClass::EightByte => matches!(ty, ScalarType::U64),
}
}
}
fn clique_kernel_name(k: usize, materialize: bool, w: CliqueWidthClass) -> &'static str {
match (k, materialize, w) {
(5, false, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE5_COUNT_HG_U32,
(5, true, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE5_MATERIALIZE_HG_U32,
(5, false, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE5_COUNT_HG_U64,
(5, true, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE5_MATERIALIZE_HG_U64,
(6, false, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE6_COUNT_HG_U32,
(6, true, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE6_MATERIALIZE_HG_U32,
(6, false, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE6_COUNT_HG_U64,
(6, true, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE6_MATERIALIZE_HG_U64,
(7, false, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE7_COUNT_HG_U32,
(7, true, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE7_MATERIALIZE_HG_U32,
(7, false, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE7_COUNT_HG_U64,
(7, true, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE7_MATERIALIZE_HG_U64,
(8, false, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE8_COUNT_HG_U32,
(8, true, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE8_MATERIALIZE_HG_U32,
(8, false, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE8_COUNT_HG_U64,
(8, true, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE8_MATERIALIZE_HG_U64,
_ => panic!("clique_kernel_name: K must be 5..8, got {}", k),
}
}
enum CliqueLeaderMetadata {
U32(WcojRelationMetadata<u32>),
U64(WcojRelationMetadata<u64>),
}
impl CliqueLeaderMetadata {
fn total_rows_u32(&self, entry_label: &str) -> Result<u32> {
let total = match self {
CliqueLeaderMetadata::U32(metadata) => metadata.total,
CliqueLeaderMetadata::U64(metadata) => metadata.total,
};
u32::try_from(total).map_err(|_| {
XlogError::Kernel(format!(
"{}: leader metadata total {} exceeds u32 kernel surface",
entry_label, total
))
})
}
fn key_count(&self) -> u32 {
match self {
CliqueLeaderMetadata::U32(metadata) => metadata.key_count,
CliqueLeaderMetadata::U64(metadata) => metadata.key_count,
}
}
}
fn validate_clique_metadata_leader<'a>(
k: usize,
edges: &'a [&CudaBuffer],
leader_edge_idx: u32,
width_class: CliqueWidthClass,
entry_label: &str,
) -> Result<&'a CudaBuffer> {
if !(5..=8).contains(&k) {
return Err(XlogError::Kernel(format!(
"{}: k must be 5..8, got {}",
entry_label, k
)));
}
let expected_edges = k * (k - 1) / 2;
if edges.len() != expected_edges {
return Err(XlogError::Kernel(format!(
"{}: expected {} edges (= C({}, 2)), got {}",
entry_label,
expected_edges,
k,
edges.len()
)));
}
let leader_slot = usize::try_from(leader_edge_idx)
.ok()
.filter(|idx| *idx < expected_edges)
.ok_or_else(|| {
XlogError::Kernel(format!(
"{}: leader_edge_idx {} out of range for {} edges",
entry_label, leader_edge_idx, expected_edges
))
})?;
let leader = edges[leader_slot];
if leader.arity() != 2 {
return Err(XlogError::Kernel(format!(
"{}: leader edge must be 2-column, got arity {}",
entry_label,
leader.arity()
)));
}
let ty = leader.schema.column_type(0).ok_or_else(|| {
XlogError::Kernel(format!(
"{}: leader edge column 0 type missing",
entry_label
))
})?;
if !width_class.validate_col_type(ty) {
return Err(XlogError::Kernel(format!(
"{}: leader edge column 0 type {:?} not in {} width-class",
entry_label,
ty,
width_class.label()
)));
}
Ok(leader)
}
impl CudaKernelProvider {
fn wcoj_clique_metadata_recorded_u32_inner(
&self,
k: usize,
edges: &[&CudaBuffer],
leader_edge_idx: u32,
launch_stream: StreamId,
entry_label: &str,
) -> Result<WcojRelationMetadata<u32>> {
let leader = validate_clique_metadata_leader(
k,
edges,
leader_edge_idx,
CliqueWidthClass::FourByte,
entry_label,
)?;
self.wcoj_build_metadata_u32_recorded(leader, 0, launch_stream)
}
fn wcoj_clique_metadata_recorded_u64_inner(
&self,
k: usize,
edges: &[&CudaBuffer],
leader_edge_idx: u32,
launch_stream: StreamId,
entry_label: &str,
) -> Result<WcojRelationMetadata<u64>> {
let leader = validate_clique_metadata_leader(
k,
edges,
leader_edge_idx,
CliqueWidthClass::EightByte,
entry_label,
)?;
self.wcoj_build_metadata_u64_recorded(leader, 0, launch_stream)
}
pub fn wcoj_clique5_metadata_recorded_u32(
&self,
edges: &[&CudaBuffer; 10],
leader_edge_idx: u32,
launch_stream: StreamId,
) -> Result<WcojRelationMetadata<u32>> {
self.wcoj_clique_metadata_recorded_u32_inner(
5,
edges,
leader_edge_idx,
launch_stream,
"wcoj_clique5_metadata_recorded_u32",
)
}
pub fn wcoj_clique5_metadata_recorded_u64(
&self,
edges: &[&CudaBuffer; 10],
leader_edge_idx: u32,
launch_stream: StreamId,
) -> Result<WcojRelationMetadata<u64>> {
self.wcoj_clique_metadata_recorded_u64_inner(
5,
edges,
leader_edge_idx,
launch_stream,
"wcoj_clique5_metadata_recorded_u64",
)
}
pub fn wcoj_clique6_metadata_recorded_u32(
&self,
edges: &[&CudaBuffer; 15],
leader_edge_idx: u32,
launch_stream: StreamId,
) -> Result<WcojRelationMetadata<u32>> {
self.wcoj_clique_metadata_recorded_u32_inner(
6,
edges,
leader_edge_idx,
launch_stream,
"wcoj_clique6_metadata_recorded_u32",
)
}
pub fn wcoj_clique6_metadata_recorded_u64(
&self,
edges: &[&CudaBuffer; 15],
leader_edge_idx: u32,
launch_stream: StreamId,
) -> Result<WcojRelationMetadata<u64>> {
self.wcoj_clique_metadata_recorded_u64_inner(
6,
edges,
leader_edge_idx,
launch_stream,
"wcoj_clique6_metadata_recorded_u64",
)
}
#[allow(clippy::too_many_arguments)]
fn wcoj_clique_recorded_inner(
&self,
k: usize,
edges: &[&CudaBuffer],
leader_edge_idx: u32,
edge_order: Option<&[u8]>,
iteration_order: Option<&[u8]>,
width_class: CliqueWidthClass,
launch_stream: StreamId,
entry_label: &str,
) -> Result<CudaBuffer> {
let runtime = self.memory().runtime().ok_or_else(|| {
XlogError::Kernel(format!(
"{} requires a runtime-backed GpuMemoryManager (with_runtime)",
entry_label
))
})?;
let cu_stream = runtime
.stream_pool()
.resolve(launch_stream)
.ok_or_else(|| {
XlogError::Kernel(format!(
"{}: launch_stream StreamId({}) does not resolve",
entry_label, launch_stream.0
))
})?;
if !(5..=8).contains(&k) {
return Err(XlogError::Kernel(format!(
"{}: k must be 5..8, got {}",
entry_label, k
)));
}
let expected_edges = k * (k - 1) / 2;
if edges.len() != expected_edges {
return Err(XlogError::Kernel(format!(
"{}: expected {} edges (= C({}, 2)), got {}",
entry_label,
expected_edges,
k,
edges.len()
)));
}
if usize::try_from(leader_edge_idx)
.ok()
.is_none_or(|idx| idx >= expected_edges)
{
return Err(XlogError::Kernel(format!(
"{}: leader_edge_idx {} out of range for {} edges",
entry_label, leader_edge_idx, expected_edges
)));
}
match (edge_order, iteration_order) {
(Some(edge_order), Some(iteration_order)) => {
validate_clique_u8_permutation(
edge_order,
expected_edges,
"edge_order",
entry_label,
)?;
validate_clique_u8_permutation(iteration_order, k, "iteration_order", entry_label)?;
}
(None, None) => {}
_ => {
return Err(XlogError::Kernel(format!(
"{}: edge_order and iteration_order must both be present or both be omitted",
entry_label
)));
}
}
for (i, buf) in edges.iter().enumerate() {
if buf.arity() != 2 {
return Err(XlogError::Kernel(format!(
"{}: edge[{}] must be 2-column, got arity {}",
entry_label,
i,
buf.arity()
)));
}
for col_idx in 0..2 {
let ty = buf.schema.column_type(col_idx).ok_or_else(|| {
XlogError::Kernel(format!(
"{}: edge[{}] column {} type missing",
entry_label, i, col_idx
))
})?;
if !width_class.validate_col_type(ty) {
return Err(XlogError::Kernel(format!(
"{}: edge[{}] column {} type {:?} not in {} width-class",
entry_label,
i,
col_idx,
ty,
width_class.label()
)));
}
}
}
let mut head_types = Vec::with_capacity(k);
let leader_slot = edge_order.map(|order| order[0] as usize).unwrap_or(0);
head_types.push(edges[leader_slot].schema.column_type(0).expect("validated"));
head_types.push(edges[leader_slot].schema.column_type(1).expect("validated"));
for i in 2..k {
let logical_edge = i - 1;
let edge_slot = edge_order
.map(|order| order[logical_edge] as usize)
.unwrap_or(logical_edge);
head_types.push(edges[edge_slot].schema.column_type(1).expect("validated"));
}
let out_schema = Schema::new(
head_types
.iter()
.enumerate()
.map(|(i, t)| (format!("col{}", i), *t))
.collect(),
);
let leader_slot = usize::try_from(leader_edge_idx).expect("validated");
let n_leader = self.logical_row_count_u32(edges[leader_slot])?;
if n_leader == 0 {
return self.create_empty_buffer(out_schema);
}
let metadata_start = Instant::now();
let leader_metadata = match width_class {
CliqueWidthClass::FourByte => {
CliqueLeaderMetadata::U32(self.wcoj_clique_metadata_recorded_u32_inner(
k,
edges,
leader_edge_idx,
launch_stream,
entry_label,
)?)
}
CliqueWidthClass::EightByte => {
CliqueLeaderMetadata::U64(self.wcoj_clique_metadata_recorded_u64_inner(
k,
edges,
leader_edge_idx,
launch_stream,
entry_label,
)?)
}
};
self.record_kclique_metadata_build_nanos(metadata_start.elapsed().as_nanos());
let leader_work_total = leader_metadata.total_rows_u32(entry_label)?;
if leader_work_total != n_leader {
return Err(XlogError::Kernel(format!(
"{}: leader metadata total {} does not match leader row count {}",
entry_label, leader_work_total, n_leader
)));
}
let leader_metadata_key_count = leader_metadata.key_count();
if leader_metadata_key_count == 0 {
return self.create_empty_buffer(out_schema);
}
let mut edge_col0_ptrs: Vec<u64> = Vec::with_capacity(expected_edges);
let mut edge_col1_ptrs: Vec<u64> = Vec::with_capacity(expected_edges);
let mut edge_n_host: Vec<u32> = Vec::with_capacity(expected_edges);
for buf in edges.iter() {
let col0 = buf.column(0).expect("validated");
let col1 = buf.column(1).expect("validated");
edge_col0_ptrs.push(*col0.device_ptr());
edge_col1_ptrs.push(*col1.device_ptr());
edge_n_host.push(self.logical_row_count_u32(buf)?);
}
let mut d_edge_col0 = self.memory.alloc::<u64>(expected_edges)?;
let mut d_edge_col1 = self.memory.alloc::<u64>(expected_edges)?;
let mut d_edge_n = self.memory.alloc::<u32>(expected_edges)?;
let device = self.device.inner();
self.htod_launch_metadata_sync_copy_into(&edge_col0_ptrs, &mut d_edge_col0)
.map_err(|e| {
XlogError::Kernel(format!(
"{}: htod edge_col0_ptrs failed: {}",
entry_label, e
))
})?;
self.htod_launch_metadata_sync_copy_into(&edge_col1_ptrs, &mut d_edge_col1)
.map_err(|e| {
XlogError::Kernel(format!(
"{}: htod edge_col1_ptrs failed: {}",
entry_label, e
))
})?;
self.htod_launch_metadata_sync_copy_into(&edge_n_host, &mut d_edge_n)
.map_err(|e| {
XlogError::Kernel(format!("{}: htod edge_n failed: {}", entry_label, e))
})?;
let d_edge_order = if let Some(edge_order) = edge_order {
let mut buf = self.memory.alloc::<u8>(expected_edges)?;
self.htod_launch_metadata_sync_copy_into(edge_order, &mut buf)
.map_err(|e| {
XlogError::Kernel(format!("{}: htod edge_order failed: {}", entry_label, e))
})?;
Some(buf)
} else {
None
};
let d_iteration_order = if let Some(iteration_order) = iteration_order {
let mut buf = self.memory.alloc::<u8>(k)?;
self.htod_launch_metadata_sync_copy_into(iteration_order, &mut buf)
.map_err(|e| {
XlogError::Kernel(format!(
"{}: htod iteration_order failed: {}",
entry_label, e
))
})?;
Some(buf)
} else {
None
};
let block_work_unit = crate::wcoj_metadata::WCOJ_HG_BLOCK_WORK_UNIT_DEFAULT;
let grid = leader_work_total.div_ceil(block_work_unit);
let count_buf = self.memory.alloc::<u32>(grid as usize)?;
let thread_counts_buf = self
.memory
.alloc::<u32>((grid as usize) * (BLOCK_SIZE as usize))?;
let mut offsets_buf = self.memory.alloc::<u32>(grid as usize)?;
let d_total = self.memory.alloc::<u32>(1)?;
let mut rec_count = LaunchRecorder::new_strict(launch_stream);
for buf in edges.iter() {
rec_count.read(buf.num_rows_device());
rec_count.read_column(buf.column(0).expect("validated"));
rec_count.read_column(buf.column(1).expect("validated"));
}
rec_count.read(&d_edge_col0);
rec_count.read(&d_edge_col1);
rec_count.read(&d_edge_n);
if let Some(buf) = d_edge_order.as_ref() {
rec_count.read(buf);
}
if let Some(buf) = d_iteration_order.as_ref() {
rec_count.read(buf);
}
match &leader_metadata {
CliqueLeaderMetadata::U32(leader_metadata) => {
rec_count.read(&leader_metadata.unique_keys);
rec_count.read(&leader_metadata.fan_out);
rec_count.read(&leader_metadata.prefix_sum);
}
CliqueLeaderMetadata::U64(leader_metadata) => {
rec_count.read(&leader_metadata.unique_keys);
rec_count.read(&leader_metadata.fan_out);
rec_count.read(&leader_metadata.prefix_sum);
}
}
rec_count.write(&count_buf);
rec_count.write(&thread_counts_buf);
rec_count.write(&offsets_buf);
rec_count.write(&d_total);
rec_count.preflight(runtime).map_err(|e| {
XlogError::Kernel(format!("{}: count preflight failed: {}", entry_label, e))
})?;
let count_kernel = device
.get_func(WCOJ_MODULE, clique_kernel_name(k, false, width_class))
.ok_or_else(|| {
XlogError::Kernel(format!(
"{}: count kernel '{}' not found",
entry_label,
clique_kernel_name(k, false, width_class)
))
})?;
let count_config = LaunchConfig {
grid_dim: (grid, 1, 1),
block_dim: (BLOCK_SIZE, 1, 1),
shared_mem_bytes: 0,
};
let null_order_ptr = 0_u64;
let edge_order_param = match d_edge_order.as_ref() {
Some(buf) => buf.as_kernel_param(),
None => null_order_ptr.as_kernel_param(),
};
let iteration_order_param = match d_iteration_order.as_ref() {
Some(buf) => buf.as_kernel_param(),
None => null_order_ptr.as_kernel_param(),
};
unsafe {
let mut params: Vec<*mut c_void> = match &leader_metadata {
CliqueLeaderMetadata::U32(leader_metadata) => vec![
(&d_edge_col0).as_kernel_param(),
(&d_edge_col1).as_kernel_param(),
(&d_edge_n).as_kernel_param(),
leader_edge_idx.as_kernel_param(),
edge_order_param,
iteration_order_param,
n_leader.as_kernel_param(),
(&leader_metadata.unique_keys).as_kernel_param(),
(&leader_metadata.fan_out).as_kernel_param(),
(&leader_metadata.prefix_sum).as_kernel_param(),
leader_metadata_key_count.as_kernel_param(),
block_work_unit.as_kernel_param(),
(&count_buf).as_kernel_param(),
(&thread_counts_buf).as_kernel_param(),
],
CliqueLeaderMetadata::U64(leader_metadata) => vec![
(&d_edge_col0).as_kernel_param(),
(&d_edge_col1).as_kernel_param(),
(&d_edge_n).as_kernel_param(),
leader_edge_idx.as_kernel_param(),
edge_order_param,
iteration_order_param,
n_leader.as_kernel_param(),
(&leader_metadata.unique_keys).as_kernel_param(),
(&leader_metadata.fan_out).as_kernel_param(),
(&leader_metadata.prefix_sum).as_kernel_param(),
leader_metadata_key_count.as_kernel_param(),
block_work_unit.as_kernel_param(),
(&count_buf).as_kernel_param(),
(&thread_counts_buf).as_kernel_param(),
],
};
count_kernel
.clone()
.launch_on_stream(&cu_stream, count_config, &mut params)
.map_err(|e| {
XlogError::Kernel(format!(
"{}: count kernel launch failed: {}",
entry_label, e
))
})?;
}
let bytes_count = (grid as usize) * std::mem::size_of::<u32>();
unsafe {
let res = sys::cuMemcpyDtoDAsync_v2(
*offsets_buf.device_ptr(),
*count_buf.device_ptr(),
bytes_count,
cu_stream.cu_stream(),
);
if res != sys::cudaError_enum::CUDA_SUCCESS {
return Err(XlogError::Kernel(format!(
"{}: dtod count → offsets failed: {:?}",
entry_label, res
)));
}
}
self.multiblock_scan_u32_inplace_on_stream(
&mut offsets_buf,
grid,
&cu_stream,
launch_stream,
runtime,
)?;
let total_kernel = device
.get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_COMPUTE_TOTAL)
.ok_or_else(|| XlogError::Kernel("wcoj_compute_total kernel not found".to_string()))?;
unsafe {
total_kernel
.clone()
.launch_on_stream(
&cu_stream,
LaunchConfig {
grid_dim: (1, 1, 1),
block_dim: (1, 1, 1),
shared_mem_bytes: 0,
},
(&count_buf, &offsets_buf, grid, &d_total),
)
.map_err(|e| {
XlogError::Kernel(format!("wcoj_compute_total launch failed: {}", e))
})?;
}
rec_count.commit(runtime).map_err(|e| {
XlogError::Kernel(format!(
"{}: count+scan+total commit failed: {}",
entry_label, e
))
})?;
cu_stream.synchronize().map_err(|e| {
XlogError::Kernel(format!(
"{}: stream sync after total failed: {}",
entry_label, e
))
})?;
let total_rows = self
.dtoh_scalar_untracked::<u32>(&d_total, 0)
.map_err(|e| {
XlogError::Kernel(format!("{}: read d_total failed: {}", entry_label, e))
})?;
if total_rows == 0 {
return self.create_empty_buffer(out_schema);
}
let elem_bytes = width_class.elem_bytes();
let bytes_per_col = (total_rows as usize) * elem_bytes;
let mut out_col_bufs: Vec<TrackedCudaSlice<u8>> = Vec::with_capacity(k);
let mut out_col_ptrs: Vec<u64> = Vec::with_capacity(k);
for _ in 0..k {
let buf = self.memory.alloc::<u8>(bytes_per_col)?;
out_col_ptrs.push(*buf.device_ptr());
out_col_bufs.push(buf);
}
let mut d_out_cols = self.memory.alloc::<u64>(k)?;
self.htod_launch_metadata_sync_copy_into(&out_col_ptrs, &mut d_out_cols)
.map_err(|e| {
XlogError::Kernel(format!("{}: htod out_col_ptrs failed: {}", entry_label, e))
})?;
let out_d_num_rows = self.memory.alloc::<u32>(1)?;
self.htod_launch_metadata_async_copy_one(
&total_rows,
&out_d_num_rows,
&cu_stream,
&format!("{entry_label}: out_d_num_rows"),
)?;
let mut rec_mat = LaunchRecorder::new_strict(launch_stream);
for buf in edges.iter() {
rec_mat.read(buf.num_rows_device());
rec_mat.read_column(buf.column(0).expect("validated"));
rec_mat.read_column(buf.column(1).expect("validated"));
}
rec_mat.read(&d_edge_col0);
rec_mat.read(&d_edge_col1);
rec_mat.read(&d_edge_n);
if let Some(buf) = d_edge_order.as_ref() {
rec_mat.read(buf);
}
if let Some(buf) = d_iteration_order.as_ref() {
rec_mat.read(buf);
}
match &leader_metadata {
CliqueLeaderMetadata::U32(leader_metadata) => {
rec_mat.read(&leader_metadata.unique_keys);
rec_mat.read(&leader_metadata.fan_out);
rec_mat.read(&leader_metadata.prefix_sum);
}
CliqueLeaderMetadata::U64(leader_metadata) => {
rec_mat.read(&leader_metadata.unique_keys);
rec_mat.read(&leader_metadata.fan_out);
rec_mat.read(&leader_metadata.prefix_sum);
}
}
rec_mat.read(&thread_counts_buf);
rec_mat.read(&offsets_buf);
rec_mat.read(&d_out_cols);
for buf in out_col_bufs.iter() {
rec_mat.write(buf);
}
rec_mat.write(&out_d_num_rows);
rec_mat.preflight(runtime).map_err(|e| {
XlogError::Kernel(format!(
"{}: materialize preflight failed: {}",
entry_label, e
))
})?;
let materialize_kernel = device
.get_func(WCOJ_MODULE, clique_kernel_name(k, true, width_class))
.ok_or_else(|| {
XlogError::Kernel(format!(
"{}: materialize kernel '{}' not found",
entry_label,
clique_kernel_name(k, true, width_class)
))
})?;
let mat_config = LaunchConfig {
grid_dim: (grid, 1, 1),
block_dim: (BLOCK_SIZE, 1, 1),
shared_mem_bytes: 0,
};
unsafe {
let mut params: Vec<*mut c_void> = match &leader_metadata {
CliqueLeaderMetadata::U32(leader_metadata) => vec![
(&d_edge_col0).as_kernel_param(),
(&d_edge_col1).as_kernel_param(),
(&d_edge_n).as_kernel_param(),
leader_edge_idx.as_kernel_param(),
edge_order_param,
iteration_order_param,
n_leader.as_kernel_param(),
(&leader_metadata.unique_keys).as_kernel_param(),
(&leader_metadata.fan_out).as_kernel_param(),
(&leader_metadata.prefix_sum).as_kernel_param(),
leader_metadata_key_count.as_kernel_param(),
block_work_unit.as_kernel_param(),
(&thread_counts_buf).as_kernel_param(),
(&offsets_buf).as_kernel_param(),
total_rows.as_kernel_param(),
(&d_out_cols).as_kernel_param(),
],
CliqueLeaderMetadata::U64(leader_metadata) => vec![
(&d_edge_col0).as_kernel_param(),
(&d_edge_col1).as_kernel_param(),
(&d_edge_n).as_kernel_param(),
leader_edge_idx.as_kernel_param(),
edge_order_param,
iteration_order_param,
n_leader.as_kernel_param(),
(&leader_metadata.unique_keys).as_kernel_param(),
(&leader_metadata.fan_out).as_kernel_param(),
(&leader_metadata.prefix_sum).as_kernel_param(),
leader_metadata_key_count.as_kernel_param(),
block_work_unit.as_kernel_param(),
(&thread_counts_buf).as_kernel_param(),
(&offsets_buf).as_kernel_param(),
total_rows.as_kernel_param(),
(&d_out_cols).as_kernel_param(),
],
};
materialize_kernel
.clone()
.launch_on_stream(&cu_stream, mat_config, &mut params)
.map_err(|e| {
XlogError::Kernel(format!("{}: materialize launch failed: {}", entry_label, e))
})?;
}
rec_mat.commit(runtime).map_err(|e| {
XlogError::Kernel(format!("{}: materialize commit failed: {}", entry_label, e))
})?;
let columns: Vec<CudaColumn> = out_col_bufs.into_iter().map(|b| b.into()).collect();
Ok(CudaBuffer::from_columns_with_host_count(
columns,
total_rows as u64,
out_d_num_rows,
out_schema,
total_rows,
))
}
pub fn wcoj_clique5_u32_recorded(
&self,
edges: &[&CudaBuffer; 10],
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_clique_recorded_inner(
5,
edges,
0,
None,
None,
CliqueWidthClass::FourByte,
launch_stream,
"wcoj_clique5_u32_recorded",
)
}
pub fn wcoj_clique5_u32_recorded_planned(
&self,
edges: &[&CudaBuffer; 10],
leader_edge_idx: u32,
edge_order: &[u8],
iteration_order: &[u8],
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_clique_recorded_inner(
5,
edges,
leader_edge_idx,
Some(edge_order),
Some(iteration_order),
CliqueWidthClass::FourByte,
launch_stream,
"wcoj_clique5_u32_recorded_planned",
)
}
pub fn wcoj_clique5_u64_recorded(
&self,
edges: &[&CudaBuffer; 10],
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_clique_recorded_inner(
5,
edges,
0,
None,
None,
CliqueWidthClass::EightByte,
launch_stream,
"wcoj_clique5_u64_recorded",
)
}
pub fn wcoj_clique5_u64_recorded_planned(
&self,
edges: &[&CudaBuffer; 10],
leader_edge_idx: u32,
edge_order: &[u8],
iteration_order: &[u8],
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_clique_recorded_inner(
5,
edges,
leader_edge_idx,
Some(edge_order),
Some(iteration_order),
CliqueWidthClass::EightByte,
launch_stream,
"wcoj_clique5_u64_recorded_planned",
)
}
pub fn wcoj_clique6_u32_recorded(
&self,
edges: &[&CudaBuffer; 15],
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_clique_recorded_inner(
6,
edges,
0,
None,
None,
CliqueWidthClass::FourByte,
launch_stream,
"wcoj_clique6_u32_recorded",
)
}
pub fn wcoj_clique6_u32_recorded_planned(
&self,
edges: &[&CudaBuffer; 15],
leader_edge_idx: u32,
edge_order: &[u8],
iteration_order: &[u8],
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_clique_recorded_inner(
6,
edges,
leader_edge_idx,
Some(edge_order),
Some(iteration_order),
CliqueWidthClass::FourByte,
launch_stream,
"wcoj_clique6_u32_recorded_planned",
)
}
pub fn wcoj_clique6_u64_recorded(
&self,
edges: &[&CudaBuffer; 15],
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_clique_recorded_inner(
6,
edges,
0,
None,
None,
CliqueWidthClass::EightByte,
launch_stream,
"wcoj_clique6_u64_recorded",
)
}
pub fn wcoj_clique6_u64_recorded_planned(
&self,
edges: &[&CudaBuffer; 15],
leader_edge_idx: u32,
edge_order: &[u8],
iteration_order: &[u8],
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_clique_recorded_inner(
6,
edges,
leader_edge_idx,
Some(edge_order),
Some(iteration_order),
CliqueWidthClass::EightByte,
launch_stream,
"wcoj_clique6_u64_recorded_planned",
)
}
pub fn wcoj_clique7_u32_recorded(
&self,
edges: &[&CudaBuffer; 21],
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_clique_recorded_inner(
7,
edges,
0,
None,
None,
CliqueWidthClass::FourByte,
launch_stream,
"wcoj_clique7_u32_recorded",
)
}
pub fn wcoj_clique7_u32_recorded_planned(
&self,
edges: &[&CudaBuffer; 21],
leader_edge_idx: u32,
edge_order: &[u8],
iteration_order: &[u8],
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_clique_recorded_inner(
7,
edges,
leader_edge_idx,
Some(edge_order),
Some(iteration_order),
CliqueWidthClass::FourByte,
launch_stream,
"wcoj_clique7_u32_recorded_planned",
)
}
pub fn wcoj_clique7_u64_recorded(
&self,
edges: &[&CudaBuffer; 21],
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_clique_recorded_inner(
7,
edges,
0,
None,
None,
CliqueWidthClass::EightByte,
launch_stream,
"wcoj_clique7_u64_recorded",
)
}
pub fn wcoj_clique7_u64_recorded_planned(
&self,
edges: &[&CudaBuffer; 21],
leader_edge_idx: u32,
edge_order: &[u8],
iteration_order: &[u8],
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_clique_recorded_inner(
7,
edges,
leader_edge_idx,
Some(edge_order),
Some(iteration_order),
CliqueWidthClass::EightByte,
launch_stream,
"wcoj_clique7_u64_recorded_planned",
)
}
pub fn wcoj_clique8_u32_recorded(
&self,
edges: &[&CudaBuffer; 28],
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_clique_recorded_inner(
8,
edges,
0,
None,
None,
CliqueWidthClass::FourByte,
launch_stream,
"wcoj_clique8_u32_recorded",
)
}
pub fn wcoj_clique8_u32_recorded_planned(
&self,
edges: &[&CudaBuffer; 28],
leader_edge_idx: u32,
edge_order: &[u8],
iteration_order: &[u8],
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_clique_recorded_inner(
8,
edges,
leader_edge_idx,
Some(edge_order),
Some(iteration_order),
CliqueWidthClass::FourByte,
launch_stream,
"wcoj_clique8_u32_recorded_planned",
)
}
pub fn wcoj_clique8_u64_recorded(
&self,
edges: &[&CudaBuffer; 28],
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_clique_recorded_inner(
8,
edges,
0,
None,
None,
CliqueWidthClass::EightByte,
launch_stream,
"wcoj_clique8_u64_recorded",
)
}
pub fn wcoj_clique8_u64_recorded_planned(
&self,
edges: &[&CudaBuffer; 28],
leader_edge_idx: u32,
edge_order: &[u8],
iteration_order: &[u8],
launch_stream: StreamId,
) -> Result<CudaBuffer> {
self.wcoj_clique_recorded_inner(
8,
edges,
leader_edge_idx,
Some(edge_order),
Some(iteration_order),
CliqueWidthClass::EightByte,
launch_stream,
"wcoj_clique8_u64_recorded_planned",
)
}
}
fn validate_clique_u8_permutation(
values: &[u8],
len: usize,
label: &str,
entry_label: &str,
) -> Result<()> {
if values.len() != len {
return Err(XlogError::Kernel(format!(
"{}: {} length {} must equal {}",
entry_label,
label,
values.len(),
len
)));
}
let mut seen = vec![false; len];
for &value in values {
let idx = usize::from(value);
if idx >= len {
return Err(XlogError::Kernel(format!(
"{}: {} value {} out of range 0..{}",
entry_label, label, value, len
)));
}
if seen[idx] {
return Err(XlogError::Kernel(format!(
"{}: {} duplicates value {}",
entry_label, label, value
)));
}
seen[idx] = true;
}
Ok(())
}