#![cfg(feature = "cuda")]
use core::ffi::c_int;
use std::vec;
use std::vec::Vec;
use dsfb_gpu_debug_core::candidate::{CandidateConfig, CandidateInterval};
use dsfb_gpu_debug_core::casefile::{build_from_artifacts_with_mode, CaseFile, EmissionMode};
use dsfb_gpu_debug_core::consensus::ConsensusCell;
use dsfb_gpu_debug_core::contract::Contract;
use dsfb_gpu_debug_core::detector::{DetectorCell, DetectorThresholds};
use dsfb_gpu_debug_core::event::{GpuTraceEventCompact, TraceEvent};
use dsfb_gpu_debug_core::motif::DetectorProfile;
use dsfb_gpu_debug_core::residual::{Baseline, ResidualCell};
use dsfb_gpu_debug_core::sign::SignCell;
use dsfb_gpu_debug_core::window::{compute_features, WindowFeature};
use crate::ffi::{
dsfb_gpu_run_pipeline, dsfb_gpu_run_pipeline_batched, dsfb_gpu_run_pipeline_on_workspace,
DetectorThresholdsFfi, PipelineTimingsFfi,
};
use crate::workspace::{
BatchedGpuWorkspace, GpuWorkspace, GraphCaptureStatus,
MAX_CANDIDATES_PER_ENTITY as WS_MAX_CANDIDATES_PER_ENTITY,
};
use crate::GpuError;
const MAX_CANDIDATES_PER_ENTITY: i32 = WS_MAX_CANDIDATES_PER_ENTITY;
pub type PipelineTimings = PipelineTimingsFfi;
pub fn build_gpu(events: &[TraceEvent], contract: &Contract) -> Result<CaseFile, GpuError> {
let (case, _) = build_gpu_inner(events, contract, false)?;
Ok(case)
}
pub fn build_gpu_timed(
events: &[TraceEvent],
contract: &Contract,
) -> Result<(CaseFile, PipelineTimings), GpuError> {
build_gpu_inner(events, contract, true)
}
fn build_gpu_inner(
events: &[TraceEvent],
contract: &Contract,
want_timings: bool,
) -> Result<(CaseFile, PipelineTimings), GpuError> {
let features: Vec<WindowFeature> = compute_features(
events,
contract.n_windows,
contract.n_entities,
u64::from(contract.window_size_ms) * 1_000_000,
);
let total: usize = (contract.n_entities as usize) * (contract.n_windows as usize);
let mut residuals: Vec<ResidualCell> = vec![ResidualCell::default(); total];
let mut signs: Vec<SignCell> = vec![SignCell::default(); total];
let mut detectors: Vec<DetectorCell> = vec![DetectorCell::default(); total];
let mut consensus: Vec<ConsensusCell> = vec![ConsensusCell::default(); total];
let candidate_capacity = (contract.n_entities as usize) * (MAX_CANDIDATES_PER_ENTITY as usize);
let mut candidate_buf: Vec<CandidateInterval> =
vec![CandidateInterval::default(); candidate_capacity];
let mut candidate_count: Vec<i32> = vec![0i32; contract.n_entities as usize];
let thresholds = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_config = CandidateConfig::CANONICAL;
let mut timings = PipelineTimings::default();
let timings_ptr: *mut PipelineTimings = if want_timings {
std::ptr::from_mut::<PipelineTimings>(&mut timings)
} else {
std::ptr::null_mut()
};
let status: c_int = call_kernel(
&features,
&mut residuals,
&mut signs,
&mut detectors,
&mut consensus,
&mut candidate_buf,
&mut candidate_count,
contract,
&thresholds,
baseline,
&candidate_config,
timings_ptr,
);
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let mut candidates: Vec<CandidateInterval> = Vec::with_capacity(candidate_capacity);
for entity_id in 0..(contract.n_entities as usize) {
let count = candidate_count[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(candidate_buf[base + i]);
}
}
let case = build_from_artifacts_with_mode(
events,
contract,
"cuda",
EmissionMode::Audit,
&features,
&residuals,
&signs,
&detectors,
&consensus,
&candidates,
);
Ok((case, timings))
}
pub fn build_gpu_on_workspace(
workspace: &mut GpuWorkspace,
events: &[TraceEvent],
contract: &Contract,
) -> Result<CaseFile, GpuError> {
let (case, _) =
build_gpu_on_workspace_inner(workspace, events, contract, EmissionMode::Audit, false)?;
Ok(case)
}
pub fn build_gpu_throughput_on_workspace(
workspace: &mut GpuWorkspace,
events: &[TraceEvent],
contract: &Contract,
) -> Result<CaseFile, GpuError> {
let (case, _) =
build_gpu_on_workspace_inner(workspace, events, contract, EmissionMode::Throughput, false)?;
Ok(case)
}
pub fn build_gpu_timed_on_workspace(
workspace: &mut GpuWorkspace,
events: &[TraceEvent],
contract: &Contract,
) -> Result<(CaseFile, PipelineTimings), GpuError> {
build_gpu_on_workspace_inner(workspace, events, contract, EmissionMode::Audit, true)
}
fn build_gpu_on_workspace_inner(
workspace: &mut GpuWorkspace,
events: &[TraceEvent],
contract: &Contract,
mode: EmissionMode,
want_timings: bool,
) -> Result<(CaseFile, PipelineTimings), GpuError> {
workspace.assert_compatible(contract)?;
let features: Vec<WindowFeature> = compute_features(
events,
contract.n_windows,
contract.n_entities,
u64::from(contract.window_size_ms) * 1_000_000,
);
let thresholds = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_config = CandidateConfig::CANONICAL;
let mut timings = PipelineTimings::default();
let timings_ptr: *mut PipelineTimings = if want_timings {
std::ptr::from_mut::<PipelineTimings>(&mut timings)
} else {
std::ptr::null_mut()
};
#[allow(unsafe_code)]
let status = unsafe {
dsfb_gpu_run_pipeline_on_workspace(
workspace.d_features(),
workspace.d_residuals(),
workspace.d_signs(),
workspace.d_detectors(),
workspace.d_consensus(),
workspace.d_candidates(),
workspace.d_candidate_count(),
features.as_ptr(),
contract.n_entities as i32,
contract.n_windows as i32,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref::<DetectorThresholdsFfi>(&thresholds),
candidate_config.min_detector_count as i32,
candidate_config.min_residual_q_raw,
candidate_config.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
workspace.residuals.as_mut_ptr(),
workspace.signs.as_mut_ptr(),
workspace.detectors.as_mut_ptr(),
workspace.consensus.as_mut_ptr(),
workspace.candidate_buf.as_mut_ptr(),
workspace.candidate_count.as_mut_ptr(),
timings_ptr,
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let mut candidates: Vec<CandidateInterval> = Vec::with_capacity(workspace.candidate_buf.len());
for entity_id in 0..(contract.n_entities as usize) {
let count = workspace.candidate_count[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(workspace.candidate_buf[base + i]);
}
}
let case = build_from_artifacts_with_mode(
events,
contract,
"cuda",
mode,
&features,
&workspace.residuals,
&workspace.signs,
&workspace.detectors,
&workspace.consensus,
&candidates,
);
Ok((case, timings))
}
#[inline(always)]
#[allow(clippy::inline_always)]
fn pack_events_to_pinned_simd(events: &[TraceEvent], pinned: &mut [GpuTraceEventCompact]) {
#[cfg(all(target_arch = "x86_64", target_feature = "avx2"))]
{
#[allow(unsafe_code)]
unsafe {
pack_events_to_pinned_avx2(events, pinned);
}
}
#[cfg(not(all(target_arch = "x86_64", target_feature = "avx2")))]
pack_events_to_pinned_scalar(events, pinned);
}
#[inline(always)]
#[allow(dead_code)]
fn pack_events_to_pinned_scalar(events: &[TraceEvent], pinned: &mut [GpuTraceEventCompact]) {
const CHUNK: usize = 8;
let n = events.len().min(pinned.len());
let full = (n / CHUNK) * CHUNK;
let src_iter = events[..full].chunks_exact(CHUNK);
let dst_iter = pinned[..full].chunks_exact_mut(CHUNK);
for (src, dst) in src_iter.zip(dst_iter) {
dst[0] = GpuTraceEventCompact::from_trace_event(&src[0]);
dst[1] = GpuTraceEventCompact::from_trace_event(&src[1]);
dst[2] = GpuTraceEventCompact::from_trace_event(&src[2]);
dst[3] = GpuTraceEventCompact::from_trace_event(&src[3]);
dst[4] = GpuTraceEventCompact::from_trace_event(&src[4]);
dst[5] = GpuTraceEventCompact::from_trace_event(&src[5]);
dst[6] = GpuTraceEventCompact::from_trace_event(&src[6]);
dst[7] = GpuTraceEventCompact::from_trace_event(&src[7]);
}
for (dst, src) in pinned[full..n].iter_mut().zip(events[full..n].iter()) {
*dst = GpuTraceEventCompact::from_trace_event(src);
}
}
#[cfg(all(target_arch = "x86_64", target_feature = "avx2"))]
#[target_feature(enable = "avx2")]
#[inline]
#[allow(unsafe_code)]
unsafe fn pack_events_to_pinned_avx2(events: &[TraceEvent], pinned: &mut [GpuTraceEventCompact]) {
use core::arch::x86_64::{
__m256i, _mm256_set_epi32, _mm256_storeu_si256, _mm256_stream_si256, _mm_sfence,
};
let n = events.len().min(pinned.len());
let src_base = events.as_ptr();
let dst_base = pinned.as_mut_ptr();
let dst_addr = dst_base as usize;
let use_stream = (dst_addr % 32) == 0;
let pairs = n / 2;
for pair_idx in 0..pairs {
let i = pair_idx * 2;
let e0 = src_base.add(i);
let e1 = src_base.add(i + 1);
#[allow(clippy::cast_ptr_alignment)]
let d_pair = dst_base.add(i).cast::<__m256i>();
let ts0 = (*e0).ts_ns;
let entity0 = (*e0).entity_id;
let lat0 = (*e0).latency_us;
let err0_bit: u32 = u32::from((*e0).error_code != 0) << 31;
let ee0: u32 = (entity0 & GpuTraceEventCompact::ENTITY_MASK) | err0_bit;
let ts1 = (*e1).ts_ns;
let entity1 = (*e1).entity_id;
let lat1 = (*e1).latency_us;
let err1_bit: u32 = u32::from((*e1).error_code != 0) << 31;
let ee1: u32 = (entity1 & GpuTraceEventCompact::ENTITY_MASK) | err1_bit;
#[allow(clippy::cast_possible_wrap)]
let v: __m256i = _mm256_set_epi32(
lat1 as i32,
ee1 as i32,
(ts1 >> 32) as i32,
(ts1 & 0xFFFF_FFFF) as i32,
lat0 as i32,
ee0 as i32,
(ts0 >> 32) as i32,
(ts0 & 0xFFFF_FFFF) as i32,
);
if use_stream {
_mm256_stream_si256(d_pair, v);
} else {
_mm256_storeu_si256(d_pair, v);
}
}
if n & 1 != 0 {
let i = n - 1;
let src_ref = &*src_base.add(i);
*dst_base.add(i) = GpuTraceEventCompact::from_trace_event(src_ref);
}
if use_stream {
_mm_sfence();
}
}
#[allow(clippy::too_many_arguments)]
fn call_kernel(
features: &[WindowFeature],
residuals: &mut [ResidualCell],
signs: &mut [SignCell],
detectors: &mut [DetectorCell],
consensus: &mut [ConsensusCell],
candidate_buf: &mut [CandidateInterval],
candidate_count: &mut [i32],
contract: &Contract,
thresholds: &DetectorThresholdsFfi,
baseline: Baseline,
candidate_config: &CandidateConfig,
timings_out: *mut PipelineTimingsFfi,
) -> c_int {
#[allow(unsafe_code)]
unsafe {
dsfb_gpu_run_pipeline(
features.as_ptr(),
contract.n_entities as i32,
contract.n_windows as i32,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref::<DetectorThresholdsFfi>(thresholds),
candidate_config.min_detector_count as i32,
candidate_config.min_residual_q_raw,
candidate_config.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
residuals.as_mut_ptr(),
signs.as_mut_ptr(),
detectors.as_mut_ptr(),
consensus.as_mut_ptr(),
candidate_buf.as_mut_ptr(),
candidate_count.as_mut_ptr(),
timings_out,
)
}
}
pub fn build_gpu_batched_throughput(
workspace: &mut BatchedGpuWorkspace,
catalogs: &[&[TraceEvent]],
contract: &Contract,
) -> Result<Vec<CaseFile>, GpuError> {
if catalogs.len() != workspace.n_catalogs as usize {
return Err(GpuError::InvalidInput(
"BatchedGpuWorkspace.n_catalogs does not match catalogs.len()",
));
}
if contract.n_entities != workspace.n_entities || contract.n_windows != workspace.n_windows {
return Err(GpuError::InvalidInput(
"BatchedGpuWorkspace dimensions do not match the supplied contract",
));
}
let n_catalogs = workspace.n_catalogs as usize;
let per_catalog = workspace.per_catalog_cells();
let window_size_ns = u64::from(contract.window_size_ms) * 1_000_000;
for (c_idx, &events) in catalogs.iter().enumerate() {
let features_for_catalog = compute_features(
events,
contract.n_windows,
contract.n_entities,
window_size_ns,
);
let dst = &mut workspace.features[c_idx * per_catalog..(c_idx + 1) * per_catalog];
dst.copy_from_slice(&features_for_catalog);
}
let thresholds = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_config = CandidateConfig::CANONICAL;
#[allow(unsafe_code)]
let status: c_int = unsafe {
dsfb_gpu_run_pipeline_batched(
workspace.d_features(),
workspace.d_residuals(),
workspace.d_signs(),
workspace.d_detectors(),
workspace.d_consensus(),
workspace.d_candidates(),
workspace.d_candidate_count(),
workspace.features.as_ptr(),
workspace.n_catalogs as i32,
workspace.n_entities as i32,
workspace.n_windows as i32,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref::<DetectorThresholdsFfi>(&thresholds),
candidate_config.min_detector_count as i32,
candidate_config.min_residual_q_raw,
candidate_config.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
workspace.residuals.as_mut_ptr(),
workspace.signs.as_mut_ptr(),
workspace.detectors.as_mut_ptr(),
workspace.consensus.as_mut_ptr(),
workspace.candidate_buf.as_mut_ptr(),
workspace.candidate_count.as_mut_ptr(),
std::ptr::null_mut(),
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let per_candidate_slot = workspace.per_catalog_candidate_slots();
let mut cases: Vec<CaseFile> = Vec::with_capacity(n_catalogs);
for c_idx in 0..n_catalogs {
let cell_range = c_idx * per_catalog..(c_idx + 1) * per_catalog;
let count_range =
c_idx * (workspace.n_entities as usize)..(c_idx + 1) * (workspace.n_entities as usize);
let cand_range = c_idx * per_candidate_slot..(c_idx + 1) * per_candidate_slot;
let mut candidates: Vec<CandidateInterval> = Vec::new();
let counts = &workspace.candidate_count[count_range];
let slots = &workspace.candidate_buf[cand_range];
for entity_id in 0..(workspace.n_entities as usize) {
let count = counts[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(slots[base + i]);
}
}
let case = build_from_artifacts_with_mode(
catalogs[c_idx],
contract,
"cuda",
EmissionMode::Throughput,
&workspace.features[cell_range.clone()],
&workspace.residuals[cell_range.clone()],
&workspace.signs[cell_range.clone()],
&workspace.detectors[cell_range.clone()],
&workspace.consensus[cell_range],
&candidates,
);
cases.push(case);
}
Ok(cases)
}
pub fn sha256_device(bytes: &[u8]) -> Result<[u8; 32], GpuError> {
let mut out = [0u8; 32];
#[allow(unsafe_code)]
let status = unsafe {
crate::ffi::dsfb_gpu_sha256_self_test(bytes.as_ptr(), bytes.len() as u64, out.as_mut_ptr())
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
Ok(out)
}
pub fn sha256_device_streaming(bytes: &[u8]) -> Result<[u8; 32], GpuError> {
let mut out = [0u8; 32];
#[allow(unsafe_code)]
let status = unsafe {
crate::ffi::dsfb_gpu_sha256_streaming_self_test(
bytes.as_ptr(),
bytes.len() as u64,
out.as_mut_ptr(),
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
Ok(out)
}
pub fn compact_densor_root_streaming_sweep_time(
n_chunks_per_catalog: u32,
chunk_size: u32,
stage_id: u32,
n_catalogs: u32,
tile_bytes: u32,
n_warmup: i32,
n_timed: i32,
) -> Result<u64, GpuError> {
let mut mean_ns: u64 = 0;
#[allow(unsafe_code)]
let status = unsafe {
crate::ffi::dsfb_gpu_compact_densor_root_streaming_sweep_time(
n_chunks_per_catalog,
chunk_size,
stage_id,
n_catalogs,
tile_bytes,
n_warmup,
n_timed,
&mut mean_ns,
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
Ok(mean_ns)
}
pub fn compact_densor_root_path1a_sweep_time(
n_chunks_per_catalog: u32,
chunk_size: u32,
stage_id: u32,
n_catalogs: u32,
n_warmup: i32,
n_timed: i32,
) -> Result<u64, GpuError> {
let mut mean_ns: u64 = 0;
#[allow(unsafe_code)]
let status = unsafe {
crate::ffi::dsfb_gpu_compact_densor_root_path1a_sweep_time(
n_chunks_per_catalog,
chunk_size,
stage_id,
n_catalogs,
n_warmup,
n_timed,
&mut mean_ns,
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
Ok(mean_ns)
}
pub fn build_gpu_throughput_device_digests_on_workspace(
events: &[TraceEvent],
contract: &Contract,
workspace: &mut GpuWorkspace,
) -> Result<CaseFile, GpuError> {
workspace.assert_compatible(contract)?;
let n_entities = contract.n_entities as i32;
let n_windows = contract.n_windows as i32;
let features = compute_features(
events,
contract.n_windows,
contract.n_entities,
u64::from(contract.window_size_ms) * 1_000_000,
);
let thresholds_ffi = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_cfg = CandidateConfig::CANONICAL;
let mut stage_digests_host = [0u8; 4 * 32];
#[allow(unsafe_code)]
let status: c_int = unsafe {
crate::ffi::dsfb_gpu_run_pipeline_throughput_digests_on_workspace(
workspace.d_features(),
workspace.d_residuals(),
workspace.d_signs(),
workspace.d_detectors(),
workspace.d_consensus(),
workspace.d_candidates(),
workspace.d_candidate_count(),
workspace.d_stage_digests(),
features.as_ptr(),
n_entities,
n_windows,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref(&thresholds_ffi),
candidate_cfg.min_detector_count as i32,
candidate_cfg.min_residual_q_raw,
candidate_cfg.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
std::ptr::null_mut(),
workspace.candidate_buf.as_mut_ptr(),
workspace.candidate_count.as_mut_ptr(),
stage_digests_host.as_mut_ptr(),
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let mut candidates: Vec<CandidateInterval> = Vec::new();
for entity_id in 0..(workspace.n_entities as usize) {
let count = workspace.candidate_count[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(workspace.candidate_buf[base + i]);
}
}
let mut residual_digest = [0u8; 32];
let mut sign_digest = [0u8; 32];
let mut detector_digest = [0u8; 32];
let mut consensus_digest = [0u8; 32];
residual_digest.copy_from_slice(&stage_digests_host[0..32]);
sign_digest.copy_from_slice(&stage_digests_host[32..64]);
detector_digest.copy_from_slice(&stage_digests_host[64..96]);
consensus_digest.copy_from_slice(&stage_digests_host[96..128]);
Ok(
dsfb_gpu_debug_core::casefile::build_throughput_from_artifacts_and_device_digests(
events,
contract,
"cuda",
&features,
residual_digest,
sign_digest,
detector_digest,
consensus_digest,
&[],
&candidates,
),
)
}
#[allow(clippy::too_many_lines)]
pub fn build_gpu_batched_throughput_device_digests(
workspace: &mut BatchedGpuWorkspace,
catalogs: &[&[TraceEvent]],
contract: &Contract,
) -> Result<Vec<CaseFile>, GpuError> {
if catalogs.len() != workspace.n_catalogs as usize {
return Err(GpuError::InvalidInput(
"Catalog slice length does not match BatchedGpuWorkspace n_catalogs",
));
}
if contract.n_entities != workspace.n_entities || contract.n_windows != workspace.n_windows {
return Err(GpuError::InvalidInput(
"BatchedGpuWorkspace dimensions do not match the supplied contract",
));
}
let n_catalogs = catalogs.len();
let n_entities = contract.n_entities as i32;
let n_windows = contract.n_windows as i32;
let per_catalog = workspace.per_catalog_cells();
let per_candidate_slot = workspace.per_catalog_candidate_slots();
for (c_idx, events) in catalogs.iter().enumerate() {
let features = compute_features(
events,
contract.n_windows,
contract.n_entities,
u64::from(contract.window_size_ms) * 1_000_000,
);
let dst = &mut workspace.features[c_idx * per_catalog..(c_idx + 1) * per_catalog];
dst.copy_from_slice(&features);
}
let thresholds_ffi = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_cfg = CandidateConfig::CANONICAL;
let mut stage_digests_host = vec![0u8; 4 * 32 * n_catalogs];
#[allow(unsafe_code)]
let status: c_int = unsafe {
crate::ffi::dsfb_gpu_run_pipeline_batched_throughput_digests(
workspace.d_features(),
workspace.d_residuals(),
workspace.d_signs(),
workspace.d_detectors(),
workspace.d_consensus(),
workspace.d_candidates(),
workspace.d_candidate_count(),
workspace.d_stage_digests(),
workspace.features.as_ptr(),
i32::try_from(n_catalogs).unwrap_or(i32::MAX),
n_entities,
n_windows,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref(&thresholds_ffi),
candidate_cfg.min_detector_count as i32,
candidate_cfg.min_residual_q_raw,
candidate_cfg.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
std::ptr::null_mut(),
workspace.candidate_buf.as_mut_ptr(),
workspace.candidate_count.as_mut_ptr(),
stage_digests_host.as_mut_ptr(),
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let mut cases: Vec<CaseFile> = Vec::with_capacity(n_catalogs);
for c_idx in 0..n_catalogs {
let cell_range = c_idx * per_catalog..(c_idx + 1) * per_catalog;
let count_range =
c_idx * (workspace.n_entities as usize)..(c_idx + 1) * (workspace.n_entities as usize);
let cand_range = c_idx * per_candidate_slot..(c_idx + 1) * per_candidate_slot;
let mut candidates: Vec<CandidateInterval> = Vec::new();
let counts = &workspace.candidate_count[count_range];
let slots = &workspace.candidate_buf[cand_range];
for entity_id in 0..(workspace.n_entities as usize) {
let count = counts[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(slots[base + i]);
}
}
let mut residual_digest = [0u8; 32];
let mut sign_digest = [0u8; 32];
let mut detector_digest = [0u8; 32];
let mut consensus_digest = [0u8; 32];
let stage_block = 32 * n_catalogs;
let r_base = c_idx * 32;
let s_base = stage_block + c_idx * 32;
let d_base = 2 * stage_block + c_idx * 32;
let cn_base = 3 * stage_block + c_idx * 32;
residual_digest.copy_from_slice(&stage_digests_host[r_base..r_base + 32]);
sign_digest.copy_from_slice(&stage_digests_host[s_base..s_base + 32]);
detector_digest.copy_from_slice(&stage_digests_host[d_base..d_base + 32]);
consensus_digest.copy_from_slice(&stage_digests_host[cn_base..cn_base + 32]);
let _ = cell_range; let case =
dsfb_gpu_debug_core::casefile::build_throughput_from_artifacts_and_device_digests(
catalogs[c_idx],
contract,
"cuda",
&workspace.features[c_idx * per_catalog..(c_idx + 1) * per_catalog],
residual_digest,
sign_digest,
detector_digest,
consensus_digest,
&[],
&candidates,
);
cases.push(case);
}
Ok(cases)
}
pub fn build_gpu_layer_a_on_workspace(
events: &[TraceEvent],
contract: &Contract,
workspace: &mut GpuWorkspace,
) -> Result<dsfb_gpu_debug_core::casefile::CompactCaseSummary, GpuError> {
workspace.assert_compatible(contract)?;
let n_entities = contract.n_entities as i32;
let n_windows = contract.n_windows as i32;
let features = compute_features(
events,
contract.n_windows,
contract.n_entities,
u64::from(contract.window_size_ms) * 1_000_000,
);
let thresholds_ffi = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_cfg = CandidateConfig::CANONICAL;
let mut stage_digests_host = [0u8; 4 * 32];
#[allow(unsafe_code)]
let status: c_int = unsafe {
crate::ffi::dsfb_gpu_run_pipeline_throughput_digests_on_workspace(
workspace.d_features(),
workspace.d_residuals(),
workspace.d_signs(),
workspace.d_detectors(),
workspace.d_consensus(),
workspace.d_candidates(),
workspace.d_candidate_count(),
workspace.d_stage_digests(),
features.as_ptr(),
n_entities,
n_windows,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref(&thresholds_ffi),
candidate_cfg.min_detector_count as i32,
candidate_cfg.min_residual_q_raw,
candidate_cfg.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
std::ptr::null_mut(),
workspace.candidate_buf.as_mut_ptr(),
workspace.candidate_count.as_mut_ptr(),
stage_digests_host.as_mut_ptr(),
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let mut candidates: Vec<CandidateInterval> = Vec::new();
for entity_id in 0..(workspace.n_entities as usize) {
let count = workspace.candidate_count[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(workspace.candidate_buf[base + i]);
}
}
let mut residual_digest = [0u8; 32];
let mut sign_digest = [0u8; 32];
let mut detector_digest = [0u8; 32];
let mut consensus_digest = [0u8; 32];
residual_digest.copy_from_slice(&stage_digests_host[0..32]);
sign_digest.copy_from_slice(&stage_digests_host[32..64]);
detector_digest.copy_from_slice(&stage_digests_host[64..96]);
consensus_digest.copy_from_slice(&stage_digests_host[96..128]);
Ok(
dsfb_gpu_debug_core::casefile::build_compact_summary_from_device_digests(
events,
contract,
"cuda",
&features,
residual_digest,
sign_digest,
detector_digest,
consensus_digest,
&candidates,
),
)
}
#[allow(clippy::too_many_lines)]
pub fn build_gpu_layer_a_batched(
workspace: &mut BatchedGpuWorkspace,
catalogs: &[&[TraceEvent]],
contract: &Contract,
) -> Result<Vec<dsfb_gpu_debug_core::casefile::CompactCaseSummary>, GpuError> {
if catalogs.len() != workspace.n_catalogs as usize {
return Err(GpuError::InvalidInput(
"Catalog slice length does not match BatchedGpuWorkspace n_catalogs",
));
}
if contract.n_entities != workspace.n_entities || contract.n_windows != workspace.n_windows {
return Err(GpuError::InvalidInput(
"BatchedGpuWorkspace dimensions do not match the supplied contract",
));
}
let n_catalogs = catalogs.len();
let n_entities = contract.n_entities as i32;
let n_windows = contract.n_windows as i32;
let per_catalog = workspace.per_catalog_cells();
let per_candidate_slot = workspace.per_catalog_candidate_slots();
for (c_idx, events) in catalogs.iter().enumerate() {
let features = compute_features(
events,
contract.n_windows,
contract.n_entities,
u64::from(contract.window_size_ms) * 1_000_000,
);
let dst = &mut workspace.features[c_idx * per_catalog..(c_idx + 1) * per_catalog];
dst.copy_from_slice(&features);
}
let thresholds_ffi = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_cfg = CandidateConfig::CANONICAL;
let mut stage_digests_host = vec![0u8; 4 * 32 * n_catalogs];
#[allow(unsafe_code)]
let status: c_int = unsafe {
crate::ffi::dsfb_gpu_run_pipeline_batched_throughput_digests(
workspace.d_features(),
workspace.d_residuals(),
workspace.d_signs(),
workspace.d_detectors(),
workspace.d_consensus(),
workspace.d_candidates(),
workspace.d_candidate_count(),
workspace.d_stage_digests(),
workspace.features.as_ptr(),
i32::try_from(n_catalogs).unwrap_or(i32::MAX),
n_entities,
n_windows,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref(&thresholds_ffi),
candidate_cfg.min_detector_count as i32,
candidate_cfg.min_residual_q_raw,
candidate_cfg.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
std::ptr::null_mut(),
workspace.candidate_buf.as_mut_ptr(),
workspace.candidate_count.as_mut_ptr(),
stage_digests_host.as_mut_ptr(),
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let mut summaries: Vec<dsfb_gpu_debug_core::casefile::CompactCaseSummary> =
Vec::with_capacity(n_catalogs);
let stage_block = 32 * n_catalogs;
for c_idx in 0..n_catalogs {
let cell_range = c_idx * per_catalog..(c_idx + 1) * per_catalog;
let count_range =
c_idx * (workspace.n_entities as usize)..(c_idx + 1) * (workspace.n_entities as usize);
let cand_range = c_idx * per_candidate_slot..(c_idx + 1) * per_candidate_slot;
let mut candidates: Vec<CandidateInterval> = Vec::new();
let counts = &workspace.candidate_count[count_range];
let slots = &workspace.candidate_buf[cand_range];
for entity_id in 0..(workspace.n_entities as usize) {
let count = counts[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(slots[base + i]);
}
}
let mut residual_digest = [0u8; 32];
let mut sign_digest = [0u8; 32];
let mut detector_digest = [0u8; 32];
let mut consensus_digest = [0u8; 32];
let r_base = c_idx * 32;
let s_base = stage_block + c_idx * 32;
let d_base = 2 * stage_block + c_idx * 32;
let cn_base = 3 * stage_block + c_idx * 32;
residual_digest.copy_from_slice(&stage_digests_host[r_base..r_base + 32]);
sign_digest.copy_from_slice(&stage_digests_host[s_base..s_base + 32]);
detector_digest.copy_from_slice(&stage_digests_host[d_base..d_base + 32]);
consensus_digest.copy_from_slice(&stage_digests_host[cn_base..cn_base + 32]);
let summary = dsfb_gpu_debug_core::casefile::build_compact_summary_from_device_digests(
catalogs[c_idx],
contract,
"cuda",
&workspace.features[cell_range],
residual_digest,
sign_digest,
detector_digest,
consensus_digest,
&candidates,
);
summaries.push(summary);
}
Ok(summaries)
}
pub fn build_gpu_fused_throughput_digests_on_workspace(
events: &[TraceEvent],
contract: &Contract,
workspace: &mut GpuWorkspace,
) -> Result<CaseFile, GpuError> {
workspace.assert_compatible(contract)?;
let n_entities = contract.n_entities as i32;
let n_windows = contract.n_windows as i32;
let features = compute_features(
events,
contract.n_windows,
contract.n_entities,
u64::from(contract.window_size_ms) * 1_000_000,
);
let thresholds_ffi = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_cfg = CandidateConfig::CANONICAL;
let mut stage_digests_host = [0u8; 4 * 32];
#[allow(unsafe_code)]
let status: c_int = unsafe {
crate::ffi::dsfb_gpu_run_pipeline_fused_throughput_digests_on_workspace(
workspace.d_features(),
workspace.d_residuals(),
workspace.d_signs(),
workspace.d_detectors(),
workspace.d_consensus(),
workspace.d_candidates(),
workspace.d_candidate_count(),
workspace.d_stage_digests(),
workspace.d_drifts(),
features.as_ptr(),
n_entities,
n_windows,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref(&thresholds_ffi),
candidate_cfg.min_detector_count as i32,
candidate_cfg.min_residual_q_raw,
candidate_cfg.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
std::ptr::null_mut(),
workspace.candidate_buf.as_mut_ptr(),
workspace.candidate_count.as_mut_ptr(),
stage_digests_host.as_mut_ptr(),
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let mut candidates: Vec<CandidateInterval> = Vec::new();
for entity_id in 0..(workspace.n_entities as usize) {
let count = workspace.candidate_count[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(workspace.candidate_buf[base + i]);
}
}
let mut residual_digest = [0u8; 32];
let mut sign_digest = [0u8; 32];
let mut detector_digest = [0u8; 32];
let mut consensus_digest = [0u8; 32];
residual_digest.copy_from_slice(&stage_digests_host[0..32]);
sign_digest.copy_from_slice(&stage_digests_host[32..64]);
detector_digest.copy_from_slice(&stage_digests_host[64..96]);
consensus_digest.copy_from_slice(&stage_digests_host[96..128]);
Ok(
dsfb_gpu_debug_core::casefile::build_throughput_from_artifacts_and_device_digests(
events,
contract,
"cuda",
&features,
residual_digest,
sign_digest,
detector_digest,
consensus_digest,
&[],
&candidates,
),
)
}
#[allow(clippy::expect_used, clippy::too_many_lines)]
pub fn build_gpu_throughput_pinned_async_on_workspace(
events: &[TraceEvent],
contract: &Contract,
workspace: &mut GpuWorkspace,
) -> Result<CaseFile, GpuError> {
workspace.assert_compatible(contract)?;
if !workspace.has_pinned_async() {
return Err(GpuError::InvalidInput(
"GpuWorkspace was not built with pinned shadows + stream; \
call GpuWorkspace::new_with_pinned_async(contract)",
));
}
let n_entities = contract.n_entities as i32;
let n_windows = contract.n_windows as i32;
let features = compute_features(
events,
contract.n_windows,
contract.n_entities,
u64::from(contract.window_size_ms) * 1_000_000,
);
{
let pinned = workspace
.features_pinned
.as_mut()
.expect("has_pinned_async guarantees features_pinned is Some");
pinned.as_mut_slice().copy_from_slice(&features);
}
let thresholds_ffi = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_cfg = CandidateConfig::CANONICAL;
let features_ptr = workspace
.features_pinned
.as_ref()
.expect("guarded above")
.as_ptr();
let candidates_ptr = workspace
.candidates_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let count_ptr = workspace
.candidate_count_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let digests_ptr = workspace
.stage_digests_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let stream = workspace.stream_handle();
#[allow(unsafe_code)]
let status: c_int = unsafe {
crate::ffi::dsfb_gpu_run_pipeline_throughput_digests_async_on_workspace(
workspace.d_features(),
workspace.d_residuals(),
workspace.d_signs(),
workspace.d_detectors(),
workspace.d_consensus(),
workspace.d_candidates(),
workspace.d_candidate_count(),
workspace.d_stage_digests(),
features_ptr,
n_entities,
n_windows,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref(&thresholds_ffi),
candidate_cfg.min_detector_count as i32,
candidate_cfg.min_residual_q_raw,
candidate_cfg.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
std::ptr::null_mut(),
candidates_ptr,
count_ptr,
digests_ptr,
stream,
c_int::from(workspace.has_const_thresholds()),
std::ptr::null_mut(),
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let count_slice = workspace
.candidate_count_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let slots = workspace
.candidates_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut candidates: Vec<CandidateInterval> = Vec::new();
for entity_id in 0..(workspace.n_entities as usize) {
let count = count_slice[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(slots[base + i]);
}
}
let stage_digests_host = workspace
.stage_digests_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut residual_digest = [0u8; 32];
let mut sign_digest = [0u8; 32];
let mut detector_digest = [0u8; 32];
let mut consensus_digest = [0u8; 32];
residual_digest.copy_from_slice(&stage_digests_host[0..32]);
sign_digest.copy_from_slice(&stage_digests_host[32..64]);
detector_digest.copy_from_slice(&stage_digests_host[64..96]);
consensus_digest.copy_from_slice(&stage_digests_host[96..128]);
Ok(
dsfb_gpu_debug_core::casefile::build_throughput_from_artifacts_and_device_digests(
events,
contract,
"cuda",
&features,
residual_digest,
sign_digest,
detector_digest,
consensus_digest,
&[],
&candidates,
),
)
}
#[allow(clippy::expect_used, clippy::too_many_lines)]
pub fn build_gpu_throughput_graph_or_demote(
events: &[TraceEvent],
contract: &Contract,
workspace: &mut GpuWorkspace,
) -> Result<(CaseFile, GraphCaptureStatus), GpuError> {
workspace.assert_compatible(contract)?;
if !workspace.has_pinned_async() {
return Err(GpuError::InvalidInput(
"GpuWorkspace was not built with pinned shadows + stream; \
call GpuWorkspace::new_with_pinned_async(contract)",
));
}
let status = workspace.try_capture_throughput_graph(contract)?;
let plan_hash = match status {
GraphCaptureStatus::Demoted { ref reason } => {
let case = build_gpu_throughput_pinned_async_on_workspace(events, contract, workspace)?;
return Ok((
case,
GraphCaptureStatus::Demoted {
reason: reason.clone(),
},
));
}
GraphCaptureStatus::Captured { plan_hash } => plan_hash,
};
let features = compute_features(
events,
contract.n_windows,
contract.n_entities,
u64::from(contract.window_size_ms) * 1_000_000,
);
{
let pinned = workspace
.features_pinned
.as_mut()
.expect("has_pinned_async guarantees features_pinned is Some");
pinned.as_mut_slice().copy_from_slice(&features);
}
let stream = workspace.stream_handle();
let graph_exec = workspace.graph_exec();
debug_assert!(graph_exec != 0, "Captured status implies graph_exec != 0");
#[allow(unsafe_code)]
let launch_status: c_int =
unsafe { crate::ffi::dsfb_gpu_launch_throughput_graph(graph_exec, stream) };
if launch_status != 0 {
return Err(GpuError::KernelFailed(launch_status));
}
let count_slice = workspace
.candidate_count_pinned
.as_ref()
.expect("has_pinned_async guarantees candidate_count_pinned is Some")
.as_slice();
let slots = workspace
.candidates_pinned
.as_ref()
.expect("has_pinned_async guarantees candidates_pinned is Some")
.as_slice();
let mut candidates: Vec<CandidateInterval> = Vec::new();
for entity_id in 0..(workspace.n_entities as usize) {
let count = count_slice[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(slots[base + i]);
}
}
let stage_digests_host = workspace
.stage_digests_pinned
.as_ref()
.expect("has_pinned_async guarantees stage_digests_pinned is Some")
.as_slice();
let mut residual_digest = [0u8; 32];
let mut sign_digest = [0u8; 32];
let mut detector_digest = [0u8; 32];
let mut consensus_digest = [0u8; 32];
residual_digest.copy_from_slice(&stage_digests_host[0..32]);
sign_digest.copy_from_slice(&stage_digests_host[32..64]);
detector_digest.copy_from_slice(&stage_digests_host[64..96]);
consensus_digest.copy_from_slice(&stage_digests_host[96..128]);
let case = dsfb_gpu_debug_core::casefile::build_throughput_from_artifacts_and_device_digests(
events,
contract,
"cuda",
&features,
residual_digest,
sign_digest,
detector_digest,
consensus_digest,
&[],
&candidates,
);
Ok((case, GraphCaptureStatus::Captured { plan_hash }))
}
#[derive(Copy, Clone, Debug, Default)]
pub struct R8HostStageTimings {
pub features_us: f32,
pub bank_and_finalize_us: f32,
}
pub use crate::ffi::R8StageTimingsFfi as R8StageTimings;
#[allow(clippy::expect_used, clippy::too_many_lines)]
pub fn build_gpu_throughput_pinned_async_on_workspace_timed(
events: &[TraceEvent],
contract: &Contract,
workspace: &mut GpuWorkspace,
) -> Result<(CaseFile, R8StageTimings, R8HostStageTimings), GpuError> {
use std::time::Instant;
workspace.assert_compatible(contract)?;
if !workspace.has_pinned_async() {
return Err(GpuError::InvalidInput(
"GpuWorkspace was not built with pinned shadows + stream; \
call GpuWorkspace::new_with_pinned_async(contract)",
));
}
let n_entities = contract.n_entities as i32;
let n_windows = contract.n_windows as i32;
let t_features = Instant::now();
let features = compute_features(
events,
contract.n_windows,
contract.n_entities,
u64::from(contract.window_size_ms) * 1_000_000,
);
#[allow(clippy::cast_precision_loss)]
let features_us = (t_features.elapsed().as_nanos() as u64) as f32 / 1_000.0_f32;
{
let pinned = workspace
.features_pinned
.as_mut()
.expect("has_pinned_async guarantees features_pinned is Some");
pinned.as_mut_slice().copy_from_slice(&features);
}
let thresholds_ffi = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_cfg = CandidateConfig::CANONICAL;
let features_ptr = workspace
.features_pinned
.as_ref()
.expect("guarded above")
.as_ptr();
let candidates_ptr = workspace
.candidates_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let count_ptr = workspace
.candidate_count_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let digests_ptr = workspace
.stage_digests_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let stream = workspace.stream_handle();
let mut device_timings = R8StageTimings::default();
#[allow(unsafe_code)]
let status: c_int = unsafe {
crate::ffi::dsfb_gpu_run_pipeline_throughput_digests_async_on_workspace(
workspace.d_features(),
workspace.d_residuals(),
workspace.d_signs(),
workspace.d_detectors(),
workspace.d_consensus(),
workspace.d_candidates(),
workspace.d_candidate_count(),
workspace.d_stage_digests(),
features_ptr,
n_entities,
n_windows,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref(&thresholds_ffi),
candidate_cfg.min_detector_count as i32,
candidate_cfg.min_residual_q_raw,
candidate_cfg.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
std::ptr::null_mut(),
candidates_ptr,
count_ptr,
digests_ptr,
stream,
c_int::from(workspace.has_const_thresholds()),
std::ptr::from_mut(&mut device_timings),
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let count_slice = workspace
.candidate_count_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let slots = workspace
.candidates_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut candidates: Vec<CandidateInterval> = Vec::new();
for entity_id in 0..(workspace.n_entities as usize) {
let count = count_slice[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(slots[base + i]);
}
}
let stage_digests_host = workspace
.stage_digests_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut residual_digest = [0u8; 32];
let mut sign_digest = [0u8; 32];
let mut detector_digest = [0u8; 32];
let mut consensus_digest = [0u8; 32];
residual_digest.copy_from_slice(&stage_digests_host[0..32]);
sign_digest.copy_from_slice(&stage_digests_host[32..64]);
detector_digest.copy_from_slice(&stage_digests_host[64..96]);
consensus_digest.copy_from_slice(&stage_digests_host[96..128]);
let t_bank = Instant::now();
let case = dsfb_gpu_debug_core::casefile::build_throughput_from_artifacts_and_device_digests(
events,
contract,
"cuda",
&features,
residual_digest,
sign_digest,
detector_digest,
consensus_digest,
&[],
&candidates,
);
#[allow(clippy::cast_precision_loss)]
let bank_and_finalize_us = (t_bank.elapsed().as_nanos() as u64) as f32 / 1_000.0_f32;
let host_timings = R8HostStageTimings {
features_us,
bank_and_finalize_us,
};
Ok((case, device_timings, host_timings))
}
#[allow(clippy::expect_used, clippy::too_many_lines)]
pub fn build_gpu_throughput_pinned_async_on_workspace_tree(
events: &[TraceEvent],
contract: &Contract,
workspace: &mut GpuWorkspace,
) -> Result<CaseFile, GpuError> {
workspace.assert_compatible(contract)?;
if !workspace.has_pinned_async() {
return Err(GpuError::InvalidInput(
"GpuWorkspace was not built with pinned shadows + stream; \
call GpuWorkspace::new_with_pinned_async(contract)",
));
}
if !workspace.has_tree_digest() {
return Err(GpuError::InvalidInput(
"GpuWorkspace tree-digest scratch is not allocated; \
call GpuWorkspace::new_with_pinned_async(contract) on a fresh workspace",
));
}
let n_entities = contract.n_entities as i32;
let n_windows = contract.n_windows as i32;
let features = compute_features(
events,
contract.n_windows,
contract.n_entities,
u64::from(contract.window_size_ms) * 1_000_000,
);
{
let pinned = workspace
.features_pinned
.as_mut()
.expect("has_pinned_async guarantees features_pinned is Some");
pinned.as_mut_slice().copy_from_slice(&features);
}
let thresholds_ffi = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_cfg = CandidateConfig::CANONICAL;
let features_ptr = workspace
.features_pinned
.as_ref()
.expect("guarded above")
.as_ptr();
let candidates_ptr = workspace
.candidates_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let count_ptr = workspace
.candidate_count_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let digests_ptr = workspace
.stage_digests_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let stream = workspace.stream_handle();
let tree_chunk_size = workspace.tree_chunk_size();
let tree_leaves_stride = workspace.tree_leaves_stride_bytes();
let tree_scratch_stride = workspace.tree_scratch_stride_bytes();
let d_tree_leaves = workspace.d_tree_leaves();
let d_tree_scratch = workspace.d_tree_scratch();
#[allow(unsafe_code)]
let status: c_int = unsafe {
crate::ffi::dsfb_gpu_run_pipeline_throughput_tree_digests_async_on_workspace(
workspace.d_features(),
workspace.d_residuals(),
workspace.d_signs(),
workspace.d_detectors(),
workspace.d_consensus(),
workspace.d_candidates(),
workspace.d_candidate_count(),
workspace.d_stage_digests(),
d_tree_leaves,
tree_leaves_stride,
d_tree_scratch,
tree_scratch_stride,
features_ptr,
n_entities,
n_windows,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref(&thresholds_ffi),
candidate_cfg.min_detector_count as i32,
candidate_cfg.min_residual_q_raw,
candidate_cfg.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
candidates_ptr,
count_ptr,
digests_ptr,
stream,
c_int::from(workspace.has_const_thresholds()),
tree_chunk_size,
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let count_slice = workspace
.candidate_count_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let slots = workspace
.candidates_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut candidates: Vec<CandidateInterval> = Vec::new();
for entity_id in 0..(workspace.n_entities as usize) {
let count = count_slice[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(slots[base + i]);
}
}
let stage_digests_host = workspace
.stage_digests_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut residual_digest = [0u8; 32];
let mut sign_digest = [0u8; 32];
let mut detector_digest = [0u8; 32];
let mut consensus_digest = [0u8; 32];
residual_digest.copy_from_slice(&stage_digests_host[0..32]);
sign_digest.copy_from_slice(&stage_digests_host[32..64]);
detector_digest.copy_from_slice(&stage_digests_host[64..96]);
consensus_digest.copy_from_slice(&stage_digests_host[96..128]);
Ok(
dsfb_gpu_debug_core::casefile::build_throughput_from_artifacts_and_device_digests(
events,
contract,
"cuda",
&features,
residual_digest,
sign_digest,
detector_digest,
consensus_digest,
&[],
&candidates,
),
)
}
#[allow(clippy::expect_used, clippy::too_many_lines)]
pub fn build_gpu_throughput_pinned_async_on_workspace_tree_compact(
events: &[TraceEvent],
contract: &Contract,
workspace: &mut GpuWorkspace,
fixture: &dsfb_gpu_debug_core::casefile::FixtureHashes,
) -> Result<CaseFile, GpuError> {
workspace.assert_compatible(contract)?;
if !workspace.has_pinned_async() {
return Err(GpuError::InvalidInput(
"GpuWorkspace was not built with pinned shadows + stream; \
call GpuWorkspace::new_with_pinned_async(contract)",
));
}
if !workspace.has_tree_digest() {
return Err(GpuError::InvalidInput(
"GpuWorkspace tree-digest scratch is not allocated; \
call GpuWorkspace::new_with_pinned_async(contract) on a fresh workspace",
));
}
let n_entities = contract.n_entities as i32;
let n_windows = contract.n_windows as i32;
let features = compute_features(
events,
contract.n_windows,
contract.n_entities,
u64::from(contract.window_size_ms) * 1_000_000,
);
{
let pinned = workspace
.features_pinned
.as_mut()
.expect("has_pinned_async guarantees features_pinned is Some");
pinned.as_mut_slice().copy_from_slice(&features);
}
let thresholds_ffi = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_cfg = CandidateConfig::CANONICAL;
let features_ptr = workspace
.features_pinned
.as_ref()
.expect("guarded above")
.as_ptr();
let candidates_ptr = workspace
.candidates_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let count_ptr = workspace
.candidate_count_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let digests_ptr = workspace
.stage_digests_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let stream = workspace.stream_handle();
let tree_chunk_size = workspace.tree_chunk_size();
let tree_leaves_stride = workspace.tree_leaves_stride_bytes();
let tree_scratch_stride = workspace.tree_scratch_stride_bytes();
let d_tree_leaves = workspace.d_tree_leaves();
let d_tree_scratch = workspace.d_tree_scratch();
#[allow(unsafe_code)]
let status: c_int = unsafe {
crate::ffi::dsfb_gpu_run_pipeline_throughput_tree_digests_async_on_workspace(
workspace.d_features(),
workspace.d_residuals(),
workspace.d_signs(),
workspace.d_detectors(),
workspace.d_consensus(),
workspace.d_candidates(),
workspace.d_candidate_count(),
workspace.d_stage_digests(),
d_tree_leaves,
tree_leaves_stride,
d_tree_scratch,
tree_scratch_stride,
features_ptr,
n_entities,
n_windows,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref(&thresholds_ffi),
candidate_cfg.min_detector_count as i32,
candidate_cfg.min_residual_q_raw,
candidate_cfg.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
candidates_ptr,
count_ptr,
digests_ptr,
stream,
c_int::from(workspace.has_const_thresholds()),
tree_chunk_size,
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let count_slice = workspace
.candidate_count_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let slots = workspace
.candidates_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut candidates: Vec<CandidateInterval> = Vec::new();
for entity_id in 0..(workspace.n_entities as usize) {
let count = count_slice[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(slots[base + i]);
}
}
let stage_digests_host = workspace
.stage_digests_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut residual_digest = [0u8; 32];
let mut sign_digest = [0u8; 32];
let mut detector_digest = [0u8; 32];
let mut consensus_digest = [0u8; 32];
residual_digest.copy_from_slice(&stage_digests_host[0..32]);
sign_digest.copy_from_slice(&stage_digests_host[32..64]);
detector_digest.copy_from_slice(&stage_digests_host[64..96]);
consensus_digest.copy_from_slice(&stage_digests_host[96..128]);
Ok(
dsfb_gpu_debug_core::casefile::build_throughput_compact_verdict_from_device_digests(
contract,
"cuda",
fixture,
residual_digest,
sign_digest,
detector_digest,
consensus_digest,
&candidates,
),
)
}
#[allow(clippy::expect_used, clippy::too_many_lines)]
pub fn evaluate_detector_wide_d64_on_workspace(
events: &[TraceEvent],
contract: &Contract,
workspace: &mut GpuWorkspace,
) -> Result<Vec<dsfb_gpu_debug_core::detector::DetectorCellWide>, GpuError> {
workspace.assert_compatible(contract)?;
if !workspace.has_pinned_async() {
return Err(GpuError::InvalidInput(
"GpuWorkspace was not built with pinned shadows + stream; \
call GpuWorkspace::new_with_pinned_async(contract)",
));
}
workspace.ensure_wide_detector_buffer()?;
workspace.ensure_axis5_grid_sum_buffer()?;
workspace.ensure_drift_buffer()?;
workspace.ensure_detector_digest_compact_buffer()?;
workspace.ensure_candidate_parallel_buffers()?;
let n_entities = contract.n_entities as i32;
let n_windows = contract.n_windows as i32;
let n_cells = contract.n_entities as usize * contract.n_windows as usize;
let features = compute_features(
events,
contract.n_windows,
contract.n_entities,
u64::from(contract.window_size_ms) * 1_000_000,
);
{
let pinned = workspace
.features_pinned
.as_mut()
.expect("has_pinned_async guarantees features_pinned is Some");
pinned.as_mut_slice().copy_from_slice(&features);
}
let thresholds_ffi = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let features_ptr = workspace
.features_pinned
.as_ref()
.expect("guarded above")
.as_ptr();
let stream = workspace.stream_handle();
let mut host_wide: Vec<dsfb_gpu_debug_core::detector::DetectorCellWide> =
vec![dsfb_gpu_debug_core::detector::DetectorCellWide::default(); n_cells];
#[allow(unsafe_code)]
let status: c_int = unsafe {
crate::ffi::dsfb_gpu_evaluate_detector_wide_d64_on_workspace(
workspace.d_features(),
workspace.d_residuals(),
workspace.d_signs(),
workspace.d_detectors_wide(),
features_ptr,
n_entities,
n_windows,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref(&thresholds_ffi),
host_wide.as_mut_ptr(),
stream,
0, )
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
Ok(host_wide)
}
#[allow(clippy::expect_used, clippy::too_many_lines)]
pub fn build_gpu_throughput_pinned_async_on_workspace_d64_tree_compact(
events: &[TraceEvent],
contract: &Contract,
workspace: &mut GpuWorkspace,
fixture: &dsfb_gpu_debug_core::casefile::FixtureHashes,
) -> Result<CaseFile, GpuError> {
workspace.assert_compatible(contract)?;
if !workspace.has_pinned_async() {
return Err(GpuError::InvalidInput(
"GpuWorkspace was not built with pinned shadows + stream; \
call GpuWorkspace::new_with_pinned_async(contract)",
));
}
if !workspace.has_tree_digest() {
return Err(GpuError::InvalidInput(
"GpuWorkspace tree-digest scratch is not allocated; \
call GpuWorkspace::new_with_pinned_async(contract) on a fresh workspace",
));
}
workspace.ensure_wide_detector_buffer()?;
workspace.ensure_axis5_grid_sum_buffer()?;
workspace.ensure_drift_buffer()?;
workspace.ensure_detector_digest_compact_buffer()?;
workspace.ensure_candidate_parallel_buffers()?;
workspace.ensure_candidate_run_buffer()?;
workspace.ensure_events_buffer(events.len() as u64)?;
let n_entities = contract.n_entities as i32;
let n_windows = contract.n_windows as i32;
let window_size_ns: u64 = u64::from(contract.window_size_ms) * 1_000_000;
{
let pinned = workspace
.events_pinned
.as_mut()
.expect("ensure_events_buffer guarantees events_pinned is Some");
let slice = pinned.as_mut_slice();
for (dst, src) in slice.iter_mut().zip(events.iter()).take(events.len()) {
*dst = GpuTraceEventCompact::from_trace_event(src);
}
}
let thresholds_ffi = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_cfg = CandidateConfig::CANONICAL;
let events_ptr = workspace
.events_pinned
.as_ref()
.expect("guarded above")
.as_ptr();
let candidates_ptr = workspace
.candidates_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let count_ptr = workspace
.candidate_count_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let digests_ptr = workspace
.stage_digests_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let stream = workspace.stream_handle();
let tree_chunk_size = workspace.tree_chunk_size();
let tree_leaves_stride = workspace.tree_leaves_stride_bytes();
let tree_scratch_stride = workspace.tree_scratch_stride_bytes();
let d_tree_leaves = workspace.d_tree_leaves();
let d_tree_scratch = workspace.d_tree_scratch();
let d_detectors_wide = workspace.d_detectors_wide();
#[allow(clippy::cast_ptr_alignment)]
let d_axis5_grid_sum = workspace.d_axis5_grid_sum().cast::<i64>();
let d_drift_buffer = workspace.d_drift_buffer();
let d_detector_digest_compact = workspace.d_detector_digest_compact();
let d_candidate_fired = workspace.d_candidate_fired();
let d_candidate_boundaries = workspace.d_candidate_boundaries();
let d_candidate_run_buffer = workspace.d_candidate_run_buffer();
let d_candidate_run_count = workspace.d_candidate_run_count();
let d_events = workspace.d_events();
let n_events_u64 = events.len() as u64;
let ticks_per_event_ns: u64 = if n_events_u64 == 0 {
1
} else {
(u64::from(contract.n_windows) * window_size_ns) / n_events_u64
};
let profile_id_i32 = DetectorProfile::D64.active_detector_count() as i32;
let wide_mask_words_used_i32 = DetectorProfile::D64.mask_word_count() as i32;
#[allow(unsafe_code)]
let status: c_int = unsafe {
crate::ffi::dsfb_gpu_run_pipeline_throughput_d64_tree_async_on_workspace(
workspace.d_features(),
workspace.d_residuals(),
workspace.d_signs(),
d_drift_buffer,
d_detectors_wide,
workspace.d_consensus(),
d_axis5_grid_sum,
d_detector_digest_compact,
d_candidate_fired,
d_candidate_boundaries,
d_candidate_run_buffer,
d_candidate_run_count,
workspace.d_candidates(),
workspace.d_candidate_count(),
workspace.d_stage_digests(),
d_tree_leaves,
tree_leaves_stride,
d_tree_scratch,
tree_scratch_stride,
d_events,
events_ptr,
n_events_u64,
ticks_per_event_ns,
window_size_ns,
n_entities,
n_windows,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref(&thresholds_ffi),
candidate_cfg.min_detector_count as i32,
candidate_cfg.min_residual_q_raw,
candidate_cfg.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
profile_id_i32,
wide_mask_words_used_i32,
candidates_ptr,
count_ptr,
digests_ptr,
stream,
tree_chunk_size,
0, std::ptr::null_mut(),
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let count_slice = workspace
.candidate_count_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let slots = workspace
.candidates_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut candidates: Vec<CandidateInterval> = Vec::new();
for entity_id in 0..(workspace.n_entities as usize) {
let count = count_slice[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(slots[base + i]);
}
}
let stage_digests_host = workspace
.stage_digests_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut residual_digest = [0u8; 32];
let mut sign_digest = [0u8; 32];
let mut detector_digest = [0u8; 32];
let mut consensus_digest = [0u8; 32];
residual_digest.copy_from_slice(&stage_digests_host[0..32]);
sign_digest.copy_from_slice(&stage_digests_host[32..64]);
detector_digest.copy_from_slice(&stage_digests_host[64..96]);
consensus_digest.copy_from_slice(&stage_digests_host[96..128]);
Ok(
dsfb_gpu_debug_core::casefile::build_throughput_compact_verdict_from_device_digests(
contract,
"cuda",
fixture,
residual_digest,
sign_digest,
detector_digest,
consensus_digest,
&candidates,
),
)
}
#[allow(clippy::expect_used, clippy::too_many_lines)]
pub fn build_gpu_throughput_pinned_async_on_workspace_d128_tree_compact(
events: &[TraceEvent],
contract: &Contract,
workspace: &mut GpuWorkspace,
fixture: &dsfb_gpu_debug_core::casefile::FixtureHashes,
) -> Result<CaseFile, GpuError> {
workspace.assert_compatible(contract)?;
if !workspace.has_pinned_async() {
return Err(GpuError::InvalidInput(
"GpuWorkspace was not built with pinned shadows + stream; \
call GpuWorkspace::new_with_pinned_async(contract)",
));
}
if !workspace.has_tree_digest() {
return Err(GpuError::InvalidInput(
"GpuWorkspace tree-digest scratch is not allocated; \
call GpuWorkspace::new_with_pinned_async(contract) on a fresh workspace",
));
}
workspace.ensure_wide_detector_buffer()?;
workspace.ensure_axis5_grid_sum_buffer()?;
workspace.ensure_drift_buffer()?;
workspace.ensure_candidate_parallel_buffers()?;
workspace.ensure_events_buffer(events.len() as u64)?;
let n_entities = contract.n_entities as i32;
let n_windows = contract.n_windows as i32;
let window_size_ns: u64 = u64::from(contract.window_size_ms) * 1_000_000;
{
let pinned = workspace
.events_pinned
.as_mut()
.expect("ensure_events_buffer guarantees events_pinned is Some");
let slice = pinned.as_mut_slice();
for (dst, src) in slice.iter_mut().zip(events.iter()).take(events.len()) {
*dst = GpuTraceEventCompact::from_trace_event(src);
}
}
let thresholds_ffi = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_cfg = CandidateConfig::CANONICAL;
let events_ptr = workspace
.events_pinned
.as_ref()
.expect("guarded above")
.as_ptr();
let candidates_ptr = workspace
.candidates_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let count_ptr = workspace
.candidate_count_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let digests_ptr = workspace
.stage_digests_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let stream = workspace.stream_handle();
let tree_chunk_size = workspace.tree_chunk_size();
let tree_leaves_stride = workspace.tree_leaves_stride_bytes();
let tree_scratch_stride = workspace.tree_scratch_stride_bytes();
let d_tree_leaves = workspace.d_tree_leaves();
let d_tree_scratch = workspace.d_tree_scratch();
let d_detectors_wide = workspace.d_detectors_wide();
#[allow(clippy::cast_ptr_alignment)]
let d_axis5_grid_sum = workspace.d_axis5_grid_sum().cast::<i64>();
let _d_drift_buffer = workspace.d_drift_buffer(); let d_candidate_fired = workspace.d_candidate_fired();
let d_candidate_boundaries = workspace.d_candidate_boundaries();
let d_events = workspace.d_events();
let n_events_u64 = events.len() as u64;
let ticks_per_event_ns: u64 = if n_events_u64 == 0 {
1
} else {
(u64::from(contract.n_windows) * window_size_ns) / n_events_u64
};
#[allow(unsafe_code)]
let status: c_int = unsafe {
crate::ffi::dsfb_gpu_run_pipeline_throughput_d128_tree_async_on_workspace(
workspace.d_features(),
workspace.d_residuals(),
workspace.d_signs(),
d_detectors_wide,
workspace.d_consensus(),
d_axis5_grid_sum,
d_candidate_fired,
d_candidate_boundaries,
workspace.d_candidates(),
workspace.d_candidate_count(),
workspace.d_stage_digests(),
d_tree_leaves,
tree_leaves_stride,
d_tree_scratch,
tree_scratch_stride,
d_events,
events_ptr,
n_events_u64,
ticks_per_event_ns,
window_size_ns,
n_entities,
n_windows,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref(&thresholds_ffi),
candidate_cfg.min_detector_count as i32,
candidate_cfg.min_residual_q_raw,
candidate_cfg.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
candidates_ptr,
count_ptr,
digests_ptr,
stream,
tree_chunk_size,
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let count_slice = workspace
.candidate_count_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let slots = workspace
.candidates_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut candidates: Vec<CandidateInterval> = Vec::new();
for entity_id in 0..(workspace.n_entities as usize) {
let count = count_slice[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(slots[base + i]);
}
}
let stage_digests_host = workspace
.stage_digests_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut residual_digest = [0u8; 32];
let mut sign_digest = [0u8; 32];
let mut detector_digest = [0u8; 32];
let mut consensus_digest = [0u8; 32];
residual_digest.copy_from_slice(&stage_digests_host[0..32]);
sign_digest.copy_from_slice(&stage_digests_host[32..64]);
detector_digest.copy_from_slice(&stage_digests_host[64..96]);
consensus_digest.copy_from_slice(&stage_digests_host[96..128]);
Ok(
dsfb_gpu_debug_core::casefile::build_throughput_compact_verdict_from_device_digests(
contract,
"cuda",
fixture,
residual_digest,
sign_digest,
detector_digest,
consensus_digest,
&candidates,
),
)
}
#[allow(clippy::expect_used, clippy::too_many_lines)]
pub fn build_gpu_throughput_pinned_async_on_workspace_d205_tree_compact(
events: &[TraceEvent],
contract: &Contract,
workspace: &mut GpuWorkspace,
fixture: &dsfb_gpu_debug_core::casefile::FixtureHashes,
) -> Result<CaseFile, GpuError> {
workspace.assert_compatible(contract)?;
if !workspace.has_pinned_async() {
return Err(GpuError::InvalidInput(
"GpuWorkspace was not built with pinned shadows + stream; \
call GpuWorkspace::new_with_pinned_async(contract)",
));
}
if !workspace.has_tree_digest() {
return Err(GpuError::InvalidInput(
"GpuWorkspace tree-digest scratch is not allocated; \
call GpuWorkspace::new_with_pinned_async(contract) on a fresh workspace",
));
}
workspace.ensure_wide_detector_buffer()?;
workspace.ensure_axis5_grid_sum_buffer()?;
workspace.ensure_drift_buffer()?;
workspace.ensure_candidate_parallel_buffers()?;
workspace.ensure_events_buffer(events.len() as u64)?;
let n_entities = contract.n_entities as i32;
let n_windows = contract.n_windows as i32;
let window_size_ns: u64 = u64::from(contract.window_size_ms) * 1_000_000;
{
let pinned = workspace
.events_pinned
.as_mut()
.expect("ensure_events_buffer guarantees events_pinned is Some");
let slice = pinned.as_mut_slice();
for (dst, src) in slice.iter_mut().zip(events.iter()).take(events.len()) {
*dst = GpuTraceEventCompact::from_trace_event(src);
}
}
let thresholds_ffi = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_cfg = CandidateConfig::CANONICAL;
let events_ptr = workspace
.events_pinned
.as_ref()
.expect("guarded above")
.as_ptr();
let candidates_ptr = workspace
.candidates_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let count_ptr = workspace
.candidate_count_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let digests_ptr = workspace
.stage_digests_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let stream = workspace.stream_handle();
let tree_chunk_size = workspace.tree_chunk_size();
let tree_leaves_stride = workspace.tree_leaves_stride_bytes();
let tree_scratch_stride = workspace.tree_scratch_stride_bytes();
let d_tree_leaves = workspace.d_tree_leaves();
let d_tree_scratch = workspace.d_tree_scratch();
let d_detectors_wide = workspace.d_detectors_wide();
#[allow(clippy::cast_ptr_alignment)]
let d_axis5_grid_sum = workspace.d_axis5_grid_sum().cast::<i64>();
let _d_drift_buffer = workspace.d_drift_buffer(); let d_candidate_fired = workspace.d_candidate_fired();
let d_candidate_boundaries = workspace.d_candidate_boundaries();
let d_events = workspace.d_events();
let n_events_u64 = events.len() as u64;
let ticks_per_event_ns: u64 = if n_events_u64 == 0 {
1
} else {
(u64::from(contract.n_windows) * window_size_ns) / n_events_u64
};
#[allow(unsafe_code)]
let status: c_int = unsafe {
crate::ffi::dsfb_gpu_run_pipeline_throughput_d205_tree_async_on_workspace(
workspace.d_features(),
workspace.d_residuals(),
workspace.d_signs(),
d_detectors_wide,
workspace.d_consensus(),
d_axis5_grid_sum,
d_candidate_fired,
d_candidate_boundaries,
workspace.d_candidates(),
workspace.d_candidate_count(),
workspace.d_stage_digests(),
d_tree_leaves,
tree_leaves_stride,
d_tree_scratch,
tree_scratch_stride,
d_events,
events_ptr,
n_events_u64,
ticks_per_event_ns,
window_size_ns,
n_entities,
n_windows,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref(&thresholds_ffi),
candidate_cfg.min_detector_count as i32,
candidate_cfg.min_residual_q_raw,
candidate_cfg.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
candidates_ptr,
count_ptr,
digests_ptr,
stream,
tree_chunk_size,
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let count_slice = workspace
.candidate_count_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let slots = workspace
.candidates_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut candidates: Vec<CandidateInterval> = Vec::new();
for entity_id in 0..(workspace.n_entities as usize) {
let count = count_slice[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(slots[base + i]);
}
}
let stage_digests_host = workspace
.stage_digests_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut residual_digest = [0u8; 32];
let mut sign_digest = [0u8; 32];
let mut detector_digest = [0u8; 32];
let mut consensus_digest = [0u8; 32];
residual_digest.copy_from_slice(&stage_digests_host[0..32]);
sign_digest.copy_from_slice(&stage_digests_host[32..64]);
detector_digest.copy_from_slice(&stage_digests_host[64..96]);
consensus_digest.copy_from_slice(&stage_digests_host[96..128]);
Ok(
dsfb_gpu_debug_core::casefile::build_throughput_compact_verdict_from_device_digests(
contract,
"cuda",
fixture,
residual_digest,
sign_digest,
detector_digest,
consensus_digest,
&candidates,
),
)
}
#[derive(Copy, Clone, Debug, Default)]
pub struct D64ThroughputHostStageTimings {
pub host_input_staging_us: f32,
pub bank_and_finalize_us: f32,
}
pub use crate::ffi::D64ThroughputStageTimingsFfi as D64ThroughputStageTimings;
#[allow(clippy::expect_used, clippy::too_many_lines)]
pub fn build_gpu_throughput_pinned_async_on_workspace_d64_tree_compact_timed(
events: &[TraceEvent],
contract: &Contract,
workspace: &mut GpuWorkspace,
fixture: &dsfb_gpu_debug_core::casefile::FixtureHashes,
) -> Result<
(
CaseFile,
D64ThroughputStageTimings,
D64ThroughputHostStageTimings,
),
GpuError,
> {
use std::time::Instant;
workspace.assert_compatible(contract)?;
if !workspace.has_pinned_async() {
return Err(GpuError::InvalidInput(
"GpuWorkspace was not built with pinned shadows + stream; \
call GpuWorkspace::new_with_pinned_async(contract)",
));
}
if !workspace.has_tree_digest() {
return Err(GpuError::InvalidInput(
"GpuWorkspace tree-digest scratch is not allocated; \
call GpuWorkspace::new_with_pinned_async(contract) on a fresh workspace",
));
}
workspace.ensure_wide_detector_buffer()?;
workspace.ensure_axis5_grid_sum_buffer()?;
workspace.ensure_drift_buffer()?;
workspace.ensure_detector_digest_compact_buffer()?;
workspace.ensure_candidate_parallel_buffers()?;
workspace.ensure_candidate_run_buffer()?;
workspace.ensure_events_buffer(events.len() as u64)?;
let n_entities = contract.n_entities as i32;
let n_windows = contract.n_windows as i32;
let window_size_ns: u64 = u64::from(contract.window_size_ms) * 1_000_000;
let t_input_staging = Instant::now();
{
let pinned = workspace
.events_pinned
.as_mut()
.expect("ensure_events_buffer guarantees events_pinned is Some");
let slice = pinned.as_mut_slice();
pack_events_to_pinned_simd(events, slice);
}
#[allow(clippy::cast_precision_loss)]
let host_input_staging_us = (t_input_staging.elapsed().as_nanos() as u64) as f32 / 1_000.0_f32;
let thresholds_ffi = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_cfg = CandidateConfig::CANONICAL;
let events_ptr = workspace
.events_pinned
.as_ref()
.expect("guarded above")
.as_ptr();
let candidates_ptr = workspace
.candidates_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let count_ptr = workspace
.candidate_count_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let digests_ptr = workspace
.stage_digests_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let stream = workspace.stream_handle();
let tree_chunk_size = workspace.tree_chunk_size();
let tree_leaves_stride = workspace.tree_leaves_stride_bytes();
let tree_scratch_stride = workspace.tree_scratch_stride_bytes();
let d_tree_leaves = workspace.d_tree_leaves();
let d_tree_scratch = workspace.d_tree_scratch();
let d_detectors_wide = workspace.d_detectors_wide();
#[allow(clippy::cast_ptr_alignment)]
let d_axis5_grid_sum = workspace.d_axis5_grid_sum().cast::<i64>();
let d_drift_buffer = workspace.d_drift_buffer();
let d_detector_digest_compact = workspace.d_detector_digest_compact();
let d_candidate_fired = workspace.d_candidate_fired();
let d_candidate_boundaries = workspace.d_candidate_boundaries();
let d_candidate_run_buffer = workspace.d_candidate_run_buffer();
let d_candidate_run_count = workspace.d_candidate_run_count();
let d_events = workspace.d_events();
let n_events_u64 = events.len() as u64;
let ticks_per_event_ns: u64 = if n_events_u64 == 0 {
1
} else {
(u64::from(contract.n_windows) * window_size_ns) / n_events_u64
};
let profile_id_i32 = DetectorProfile::D64.active_detector_count() as i32;
let wide_mask_words_used_i32 = DetectorProfile::D64.mask_word_count() as i32;
let mut device_timings = D64ThroughputStageTimings::default();
#[allow(unsafe_code)]
let status: c_int = unsafe {
crate::ffi::dsfb_gpu_run_pipeline_throughput_d64_tree_async_on_workspace(
workspace.d_features(),
workspace.d_residuals(),
workspace.d_signs(),
d_drift_buffer,
d_detectors_wide,
workspace.d_consensus(),
d_axis5_grid_sum,
d_detector_digest_compact,
d_candidate_fired,
d_candidate_boundaries,
d_candidate_run_buffer,
d_candidate_run_count,
workspace.d_candidates(),
workspace.d_candidate_count(),
workspace.d_stage_digests(),
d_tree_leaves,
tree_leaves_stride,
d_tree_scratch,
tree_scratch_stride,
d_events,
events_ptr,
n_events_u64,
ticks_per_event_ns,
window_size_ns,
n_entities,
n_windows,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref(&thresholds_ffi),
candidate_cfg.min_detector_count as i32,
candidate_cfg.min_residual_q_raw,
candidate_cfg.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
profile_id_i32,
wide_mask_words_used_i32,
candidates_ptr,
count_ptr,
digests_ptr,
stream,
tree_chunk_size,
0, std::ptr::from_mut(&mut device_timings),
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let count_slice = workspace
.candidate_count_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let slots = workspace
.candidates_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut candidates: Vec<CandidateInterval> = Vec::new();
for entity_id in 0..(workspace.n_entities as usize) {
let count = count_slice[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(slots[base + i]);
}
}
let stage_digests_host = workspace
.stage_digests_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut residual_digest = [0u8; 32];
let mut sign_digest = [0u8; 32];
let mut detector_digest = [0u8; 32];
let mut consensus_digest = [0u8; 32];
residual_digest.copy_from_slice(&stage_digests_host[0..32]);
sign_digest.copy_from_slice(&stage_digests_host[32..64]);
detector_digest.copy_from_slice(&stage_digests_host[64..96]);
consensus_digest.copy_from_slice(&stage_digests_host[96..128]);
let t_bank = Instant::now();
let case = dsfb_gpu_debug_core::casefile::build_throughput_compact_verdict_from_device_digests(
contract,
"cuda",
fixture,
residual_digest,
sign_digest,
detector_digest,
consensus_digest,
&candidates,
);
#[allow(clippy::cast_precision_loss)]
let bank_and_finalize_us = (t_bank.elapsed().as_nanos() as u64) as f32 / 1_000.0_f32;
let host_timings = D64ThroughputHostStageTimings {
host_input_staging_us,
bank_and_finalize_us,
};
Ok((case, device_timings, host_timings))
}
#[allow(clippy::expect_used, clippy::too_many_lines)]
pub fn build_gpu_throughput_pinned_async_on_workspace_d64_compact_densor_compact_timed(
events: &[TraceEvent],
contract: &Contract,
workspace: &mut GpuWorkspace,
fixture: &dsfb_gpu_debug_core::casefile::FixtureHashes,
) -> Result<
(
CaseFile,
D64ThroughputStageTimings,
D64ThroughputHostStageTimings,
),
GpuError,
> {
use std::time::Instant;
workspace.assert_compatible(contract)?;
if !workspace.has_pinned_async() {
return Err(GpuError::InvalidInput(
"GpuWorkspace was not built with pinned shadows + stream; \
call GpuWorkspace::new_with_pinned_async(contract)",
));
}
if !workspace.has_tree_digest() {
return Err(GpuError::InvalidInput(
"GpuWorkspace tree-digest scratch is not allocated; \
call GpuWorkspace::new_with_pinned_async(contract) on a fresh workspace",
));
}
workspace.ensure_wide_detector_buffer()?;
workspace.ensure_axis5_grid_sum_buffer()?;
workspace.ensure_drift_buffer()?;
workspace.ensure_detector_digest_compact_buffer()?;
workspace.ensure_candidate_parallel_buffers()?;
workspace.ensure_candidate_run_buffer()?;
workspace.ensure_events_buffer(events.len() as u64)?;
let n_entities = contract.n_entities as i32;
let n_windows = contract.n_windows as i32;
let window_size_ns: u64 = u64::from(contract.window_size_ms) * 1_000_000;
let t_input_staging = Instant::now();
{
let pinned = workspace
.events_pinned
.as_mut()
.expect("ensure_events_buffer guarantees events_pinned is Some");
let slice = pinned.as_mut_slice();
pack_events_to_pinned_simd(events, slice);
}
#[allow(clippy::cast_precision_loss)]
let host_input_staging_us = (t_input_staging.elapsed().as_nanos() as u64) as f32 / 1_000.0_f32;
let thresholds_ffi = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_cfg = CandidateConfig::CANONICAL;
let events_ptr = workspace
.events_pinned
.as_ref()
.expect("guarded above")
.as_ptr();
let candidates_ptr = workspace
.candidates_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let count_ptr = workspace
.candidate_count_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let digests_ptr = workspace
.stage_digests_pinned
.as_mut()
.expect("guarded above")
.as_mut_ptr();
let stream = workspace.stream_handle();
let tree_chunk_size = workspace.tree_chunk_size();
let tree_leaves_stride = workspace.tree_leaves_stride_bytes();
let tree_scratch_stride = workspace.tree_scratch_stride_bytes();
let d_tree_leaves = workspace.d_tree_leaves();
let d_tree_scratch = workspace.d_tree_scratch();
let d_detectors_wide = workspace.d_detectors_wide();
#[allow(clippy::cast_ptr_alignment)]
let d_axis5_grid_sum = workspace.d_axis5_grid_sum().cast::<i64>();
let d_drift_buffer = workspace.d_drift_buffer();
let d_detector_digest_compact = workspace.d_detector_digest_compact();
let d_candidate_fired = workspace.d_candidate_fired();
let d_candidate_boundaries = workspace.d_candidate_boundaries();
let d_candidate_run_buffer = workspace.d_candidate_run_buffer();
let d_candidate_run_count = workspace.d_candidate_run_count();
let d_events = workspace.d_events();
let n_events_u64 = events.len() as u64;
let ticks_per_event_ns: u64 = if n_events_u64 == 0 {
1
} else {
(u64::from(contract.n_windows) * window_size_ns) / n_events_u64
};
let profile_id_i32 = DetectorProfile::D64.active_detector_count() as i32;
let wide_mask_words_used_i32 = DetectorProfile::D64.mask_word_count() as i32;
let mut device_timings = D64ThroughputStageTimings::default();
#[allow(unsafe_code)]
let status: c_int = unsafe {
crate::ffi::dsfb_gpu_run_pipeline_throughput_d64_tree_async_on_workspace(
workspace.d_features(),
workspace.d_residuals(),
workspace.d_signs(),
d_drift_buffer,
d_detectors_wide,
workspace.d_consensus(),
d_axis5_grid_sum,
d_detector_digest_compact,
d_candidate_fired,
d_candidate_boundaries,
d_candidate_run_buffer,
d_candidate_run_count,
workspace.d_candidates(),
workspace.d_candidate_count(),
workspace.d_stage_digests(),
d_tree_leaves,
tree_leaves_stride,
d_tree_scratch,
tree_scratch_stride,
d_events,
events_ptr,
n_events_u64,
ticks_per_event_ns,
window_size_ns,
n_entities,
n_windows,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref(&thresholds_ffi),
candidate_cfg.min_detector_count as i32,
candidate_cfg.min_residual_q_raw,
candidate_cfg.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
profile_id_i32,
wide_mask_words_used_i32,
candidates_ptr,
count_ptr,
digests_ptr,
stream,
tree_chunk_size,
1, std::ptr::from_mut(&mut device_timings),
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let count_slice = workspace
.candidate_count_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let slots = workspace
.candidates_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut candidates: Vec<CandidateInterval> = Vec::new();
for entity_id in 0..(workspace.n_entities as usize) {
let count = count_slice[entity_id] as usize;
let base = entity_id * MAX_CANDIDATES_PER_ENTITY as usize;
for i in 0..count {
candidates.push(slots[base + i]);
}
}
let stage_digests_host = workspace
.stage_digests_pinned
.as_ref()
.expect("guarded above")
.as_slice();
let mut residual_digest = [0u8; 32];
let mut sign_digest = [0u8; 32];
let mut detector_digest = [0u8; 32];
let mut consensus_digest = [0u8; 32];
residual_digest.copy_from_slice(&stage_digests_host[0..32]);
sign_digest.copy_from_slice(&stage_digests_host[32..64]);
detector_digest.copy_from_slice(&stage_digests_host[64..96]);
consensus_digest.copy_from_slice(&stage_digests_host[96..128]);
let t_bank = Instant::now();
let case = dsfb_gpu_debug_core::casefile::build_throughput_compact_verdict_from_device_digests(
contract,
"cuda",
fixture,
residual_digest,
sign_digest,
detector_digest,
consensus_digest,
&candidates,
);
#[allow(clippy::cast_precision_loss)]
let bank_and_finalize_us = (t_bank.elapsed().as_nanos() as u64) as f32 / 1_000.0_f32;
let host_timings = D64ThroughputHostStageTimings {
host_input_staging_us,
bank_and_finalize_us,
};
Ok((case, device_timings, host_timings))
}
#[cfg(test)]
mod s_perf_13_pack_equivalence_tests {
use super::*;
use dsfb_gpu_debug_core::event::TraceEvent;
fn pack_events_to_pinned_scalar(events: &[TraceEvent], pinned: &mut [GpuTraceEventCompact]) {
for (dst, src) in pinned.iter_mut().zip(events.iter()).take(events.len()) {
*dst = GpuTraceEventCompact::from_trace_event(src);
}
}
fn synthetic_events(n: usize) -> Vec<TraceEvent> {
let mut events = Vec::with_capacity(n);
let mut state: u64 = 0xD5FB_D5FB_D5FB_D5FB;
for i in 0..n {
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1_442_695_040_888_963_407);
let ts_ns = (i as u64) * 1000 + (state >> 16);
let entity_id = (i as u32) % 1024;
let latency_us = ((state >> 32) as u32) % 100_000;
let error_code: u16 = u16::from((state & (1 << 7)) != 0);
events.push(TraceEvent::new(
ts_ns,
entity_id,
(i as u32) % 16,
state,
state.wrapping_shr(8),
latency_us,
200,
error_code,
0,
0,
));
}
events
}
#[test]
fn packed_bytes_byte_identical_to_old_path() {
for &n in &[0_usize, 1, 7, 8, 9, 64, 257, 4096, 65_537] {
let events = synthetic_events(n);
let mut scalar_out = vec![GpuTraceEventCompact::default(); n];
let mut simd_out = vec![GpuTraceEventCompact::default(); n];
pack_events_to_pinned_scalar(&events, &mut scalar_out);
pack_events_to_pinned_simd(&events, &mut simd_out);
assert_eq!(
scalar_out, simd_out,
"SIMD pack diverged from scalar baseline at n_events={n}; \
panel-locked S-PERF.13 N8 + P6 contract violated"
);
}
}
#[test]
fn pack_does_not_write_past_events_len() {
let events = synthetic_events(17);
let mut pinned = vec![GpuTraceEventCompact::default(); 64];
pack_events_to_pinned_simd(&events, &mut pinned);
let mut expected_head = vec![GpuTraceEventCompact::default(); 17];
pack_events_to_pinned_scalar(&events, &mut expected_head);
assert_eq!(&pinned[..17], &expected_head[..]);
for (i, slot) in pinned.iter().enumerate().skip(17) {
assert_eq!(
*slot,
GpuTraceEventCompact::default(),
"pinned shadow slot {i} written past events.len()"
);
}
}
#[test]
fn changing_any_event_byte_changes_packed_output() {
let n = 32;
let baseline_events = synthetic_events(n);
let mut baseline = vec![GpuTraceEventCompact::default(); n];
pack_events_to_pinned_simd(&baseline_events, &mut baseline);
for mutate_idx in 0..n {
let mut mutated_events = baseline_events.clone();
mutated_events[mutate_idx].latency_us =
mutated_events[mutate_idx].latency_us.wrapping_add(1);
let mut mutated = vec![GpuTraceEventCompact::default(); n];
pack_events_to_pinned_simd(&mutated_events, &mut mutated);
assert_ne!(
baseline, mutated,
"mutating event index {mutate_idx} produced byte-identical packed output; \
SIMD pack may be skipping a chunk boundary"
);
}
}
}