#![cfg(feature = "cuda")]
use core::ffi::c_int;
use std::vec;
use std::vec::Vec;
use dsfb_gpu_debug_core::candidate::{CandidateConfig, CandidateInterval};
use dsfb_gpu_debug_core::consensus::ConsensusCell;
use dsfb_gpu_debug_core::contract::Contract;
use dsfb_gpu_debug_core::detector::{DetectorCell, DetectorThresholds};
use dsfb_gpu_debug_core::event::GpuTraceEventCompact;
use dsfb_gpu_debug_core::hash::sha256;
use dsfb_gpu_debug_core::residual::{Baseline, ResidualCell};
use dsfb_gpu_debug_core::sign::SignCell;
use dsfb_gpu_debug_core::window::WindowFeature;
use crate::ffi;
use crate::ffi::DetectorThresholdsFfi;
use crate::GpuError;
#[derive(Debug, Clone)]
pub enum GraphCaptureStatus {
Captured {
plan_hash: [u8; 32],
},
Demoted {
reason: String,
},
}
const GRAPH_PLAN_VERSION: &str = "DSFB-GPU-DEBUG-GRAPH-V1";
fn ensure_canonical_thresholds_uploaded() -> bool {
use std::sync::OnceLock;
static UPLOAD_RESULT: OnceLock<bool> = OnceLock::new();
*UPLOAD_RESULT.get_or_init(|| {
let thresholds_ffi = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
#[allow(unsafe_code)]
let status: core::ffi::c_int = unsafe {
ffi::dsfb_gpu_upload_detector_thresholds(std::ptr::from_ref(&thresholds_ffi))
};
status == 0
})
}
fn compute_throughput_graph_plan_hash(
contract: &Contract,
uses_const_thresholds: bool,
) -> [u8; 32] {
use core::fmt::Write;
let mut text = String::with_capacity(512);
text.push_str("graph_version=");
text.push_str(GRAPH_PLAN_VERSION);
text.push('\n');
text.push_str("emission_mode=Throughput\n");
text.push_str("numeric_mode=Q16.16\n");
let _ = writeln!(text, "n_catalogs={}", 1u32);
let _ = writeln!(text, "n_entities={}", contract.n_entities);
let _ = writeln!(text, "n_windows={}", contract.n_windows);
text.push_str(
"kernel_sequence=\
residual_field,drift_slew_sign,detector_motif,\
consensus_grid,candidate_collapse,\
digest_residual,digest_sign,digest_detector,digest_consensus\n",
);
text.push_str("uses_pre_alpha=false\n");
text.push_str("uses_fused_residual_sign=false\n");
text.push_str("uses_device_digests=true\n");
text.push_str("candidate_layout_version=2\n");
text.push_str("stage_digest_count=4\n");
text.push_str(if uses_const_thresholds {
"uses_const_thresholds=true\n"
} else {
"uses_const_thresholds=false\n"
});
sha256(text.as_bytes())
}
pub(crate) const MAX_CANDIDATES_PER_ENTITY: i32 = 16;
pub(crate) const TREE_DIGEST_CHUNK_SIZE: u32 = 16_384;
pub(crate) const TREE_DIGEST_HEADER_BYTES: u64 = 18 + 4 + 4 + 4;
pub(crate) const TREE_DIGEST_WORST_CELL_BYTES: u64 = 264;
pub struct GpuWorkspace {
d_features: *mut WindowFeature,
d_residuals: *mut ResidualCell,
d_signs: *mut SignCell,
d_detectors: *mut DetectorCell,
d_consensus: *mut ConsensusCell,
d_candidates: *mut CandidateInterval,
d_candidate_count: *mut i32,
d_stage_digests: *mut u8,
d_drifts: *mut u8,
pub(crate) features_pinned: Option<crate::pinned::PinnedHostBuf<WindowFeature>>,
pub(crate) candidates_pinned: Option<crate::pinned::PinnedHostBuf<CandidateInterval>>,
pub(crate) candidate_count_pinned: Option<crate::pinned::PinnedHostBuf<i32>>,
pub(crate) stage_digests_pinned: Option<crate::pinned::PinnedHostBuf<u8>>,
pub(crate) stream: u64,
pub(crate) graph_exec: u64,
pub(crate) graph_plan_hash: Option<[u8; 32]>,
pub(crate) const_thresholds_uploaded: bool,
pub(crate) d_tree_leaves: *mut u8,
pub(crate) d_tree_scratch: *mut u8,
pub(crate) tree_chunk_size: u32,
pub(crate) tree_leaves_stride_bytes: u64,
pub(crate) tree_scratch_stride_bytes: u64,
pub(crate) d_detectors_wide: *mut dsfb_gpu_debug_core::detector::DetectorCellWide,
pub(crate) d_axis5_grid_sum: *mut u8,
pub(crate) d_drift_buffer: *mut i32,
pub(crate) d_detector_digest_compact: *mut u8,
pub(crate) d_candidate_fired: *mut u8,
pub(crate) d_candidate_boundaries: *mut u8,
pub(crate) d_candidate_run_buffer: *mut u8,
pub(crate) d_candidate_run_count: *mut i32,
pub(crate) d_events: *mut GpuTraceEventCompact,
pub(crate) d_events_capacity: u64,
pub(crate) events_pinned: Option<crate::pinned::PinnedHostBuf<GpuTraceEventCompact>>,
pub(crate) residuals: Vec<ResidualCell>,
pub(crate) signs: Vec<SignCell>,
pub(crate) detectors: Vec<DetectorCell>,
pub(crate) consensus: Vec<ConsensusCell>,
pub(crate) candidate_buf: Vec<CandidateInterval>,
pub(crate) candidate_count: Vec<i32>,
pub n_entities: u32,
pub n_windows: u32,
}
impl GpuWorkspace {
#[allow(clippy::too_many_lines)]
pub fn new(contract: &Contract) -> Result<Self, GpuError> {
let total = (contract.n_entities as usize) * (contract.n_windows as usize);
let candidate_capacity =
(contract.n_entities as usize) * (MAX_CANDIDATES_PER_ENTITY as usize);
let mut d_features: *mut WindowFeature = std::ptr::null_mut();
let mut d_residuals: *mut ResidualCell = std::ptr::null_mut();
let mut d_signs: *mut SignCell = std::ptr::null_mut();
let mut d_detectors: *mut DetectorCell = std::ptr::null_mut();
let mut d_consensus: *mut ConsensusCell = std::ptr::null_mut();
let mut d_candidates: *mut CandidateInterval = std::ptr::null_mut();
let mut d_candidate_count: *mut i32 = std::ptr::null_mut();
#[allow(unsafe_code)]
let status: c_int = unsafe {
ffi::dsfb_gpu_workspace_alloc(
contract.n_entities as i32,
contract.n_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
std::ptr::from_mut(&mut d_features),
std::ptr::from_mut(&mut d_residuals),
std::ptr::from_mut(&mut d_signs),
std::ptr::from_mut(&mut d_detectors),
std::ptr::from_mut(&mut d_consensus),
std::ptr::from_mut(&mut d_candidates),
std::ptr::from_mut(&mut d_candidate_count),
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let mut d_stage_digests: *mut u8 = std::ptr::null_mut();
#[allow(unsafe_code)]
let dig_status: c_int =
unsafe { ffi::dsfb_gpu_alloc_bytes(4 * 32, std::ptr::from_mut(&mut d_stage_digests)) };
if dig_status != 0 {
#[allow(unsafe_code)]
unsafe {
ffi::dsfb_gpu_workspace_free(
d_features,
d_residuals,
d_signs,
d_detectors,
d_consensus,
d_candidates,
d_candidate_count,
);
}
return Err(GpuError::KernelFailed(dig_status));
}
let drift_bytes = (total as u64) * 4;
let mut d_drifts: *mut u8 = std::ptr::null_mut();
#[allow(unsafe_code)]
let drift_status: c_int =
unsafe { ffi::dsfb_gpu_alloc_bytes(drift_bytes, std::ptr::from_mut(&mut d_drifts)) };
if drift_status != 0 {
#[allow(unsafe_code)]
unsafe {
ffi::dsfb_gpu_workspace_free(
d_features,
d_residuals,
d_signs,
d_detectors,
d_consensus,
d_candidates,
d_candidate_count,
);
ffi::dsfb_gpu_free_bytes(d_stage_digests);
}
return Err(GpuError::KernelFailed(drift_status));
}
Ok(Self {
d_features,
d_residuals,
d_signs,
d_detectors,
d_consensus,
d_candidates,
d_candidate_count,
d_stage_digests,
d_drifts,
features_pinned: None,
candidates_pinned: None,
candidate_count_pinned: None,
stage_digests_pinned: None,
stream: 0,
graph_exec: 0,
graph_plan_hash: None,
const_thresholds_uploaded: false,
d_tree_leaves: std::ptr::null_mut(),
d_tree_scratch: std::ptr::null_mut(),
tree_chunk_size: 0,
tree_leaves_stride_bytes: 0,
tree_scratch_stride_bytes: 0,
d_detectors_wide: std::ptr::null_mut(),
d_axis5_grid_sum: std::ptr::null_mut(),
d_drift_buffer: std::ptr::null_mut(),
d_detector_digest_compact: std::ptr::null_mut(),
d_candidate_fired: std::ptr::null_mut(),
d_candidate_boundaries: std::ptr::null_mut(),
d_candidate_run_buffer: std::ptr::null_mut(),
d_candidate_run_count: std::ptr::null_mut(),
d_events: std::ptr::null_mut(),
d_events_capacity: 0,
events_pinned: None,
residuals: vec![ResidualCell::default(); total],
signs: vec![SignCell::default(); total],
detectors: vec![DetectorCell::default(); total],
consensus: vec![ConsensusCell::default(); total],
candidate_buf: vec![CandidateInterval::default(); candidate_capacity],
candidate_count: vec![0i32; contract.n_entities as usize],
n_entities: contract.n_entities,
n_windows: contract.n_windows,
})
}
pub fn new_with_pinned_async(contract: &Contract) -> Result<Self, GpuError> {
let mut ws = Self::new(contract)?;
let total = (contract.n_entities as usize) * (contract.n_windows as usize);
let candidate_capacity =
(contract.n_entities as usize) * (MAX_CANDIDATES_PER_ENTITY as usize);
ws.features_pinned = Some(crate::pinned::PinnedHostBuf::new(total)?);
ws.candidates_pinned = Some(crate::pinned::PinnedHostBuf::new(candidate_capacity)?);
ws.candidate_count_pinned = Some(crate::pinned::PinnedHostBuf::new(
contract.n_entities as usize,
)?);
ws.stage_digests_pinned = Some(crate::pinned::PinnedHostBuf::new(4 * 32)?);
let mut stream_handle: u64 = 0;
#[allow(unsafe_code)]
let status: c_int =
unsafe { ffi::dsfb_gpu_create_stream(std::ptr::from_mut(&mut stream_handle)) };
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
ws.stream = stream_handle;
ws.const_thresholds_uploaded = ensure_canonical_thresholds_uploaded();
ws.tree_chunk_size = TREE_DIGEST_CHUNK_SIZE;
let n_cells = (contract.n_entities as u64) * (contract.n_windows as u64);
let worst_stage_cell_bytes: u64 = TREE_DIGEST_WORST_CELL_BYTES;
let worst_stage_bytes = n_cells * worst_stage_cell_bytes;
let worst_n_chunks = worst_stage_bytes
.div_ceil(u64::from(ws.tree_chunk_size))
.max(1);
ws.tree_leaves_stride_bytes = worst_n_chunks * 32;
ws.tree_scratch_stride_bytes = TREE_DIGEST_HEADER_BYTES + worst_n_chunks * 32;
let total_leaves_bytes = 4 * ws.tree_leaves_stride_bytes;
let total_scratch_bytes = 4 * ws.tree_scratch_stride_bytes;
let mut d_tree_leaves: *mut u8 = std::ptr::null_mut();
#[allow(unsafe_code)]
let leaves_status: c_int = unsafe {
ffi::dsfb_gpu_alloc_bytes(total_leaves_bytes, std::ptr::from_mut(&mut d_tree_leaves))
};
if leaves_status != 0 {
return Err(GpuError::KernelFailed(leaves_status));
}
ws.d_tree_leaves = d_tree_leaves;
let mut d_tree_scratch: *mut u8 = std::ptr::null_mut();
#[allow(unsafe_code)]
let scratch_status: c_int = unsafe {
ffi::dsfb_gpu_alloc_bytes(total_scratch_bytes, std::ptr::from_mut(&mut d_tree_scratch))
};
if scratch_status != 0 {
return Err(GpuError::KernelFailed(scratch_status));
}
ws.d_tree_scratch = d_tree_scratch;
Ok(ws)
}
pub fn assert_compatible(&self, contract: &Contract) -> Result<(), GpuError> {
if contract.n_entities == self.n_entities && contract.n_windows == self.n_windows {
Ok(())
} else {
Err(GpuError::InvalidInput(
"GpuWorkspace dimensions do not match the supplied contract",
))
}
}
pub(crate) fn d_features(&self) -> *mut WindowFeature {
self.d_features
}
pub(crate) fn d_residuals(&self) -> *mut ResidualCell {
self.d_residuals
}
pub(crate) fn d_signs(&self) -> *mut SignCell {
self.d_signs
}
pub(crate) fn d_detectors(&self) -> *mut DetectorCell {
self.d_detectors
}
pub(crate) fn d_consensus(&self) -> *mut ConsensusCell {
self.d_consensus
}
pub(crate) fn d_candidates(&self) -> *mut CandidateInterval {
self.d_candidates
}
pub(crate) fn d_candidate_count(&self) -> *mut i32 {
self.d_candidate_count
}
pub(crate) fn d_stage_digests(&self) -> *mut u8 {
self.d_stage_digests
}
pub(crate) fn d_drifts(&self) -> *mut u8 {
self.d_drifts
}
#[must_use]
pub fn has_pinned_async(&self) -> bool {
self.features_pinned.is_some()
&& self.candidates_pinned.is_some()
&& self.candidate_count_pinned.is_some()
&& self.stage_digests_pinned.is_some()
&& self.stream != 0
}
pub(crate) fn stream_handle(&self) -> u64 {
self.stream
}
pub(crate) fn d_tree_leaves(&self) -> *mut u8 {
self.d_tree_leaves
}
pub(crate) fn d_tree_scratch(&self) -> *mut u8 {
self.d_tree_scratch
}
#[must_use]
pub fn tree_chunk_size(&self) -> u32 {
self.tree_chunk_size
}
pub(crate) fn tree_leaves_stride_bytes(&self) -> u64 {
self.tree_leaves_stride_bytes
}
pub(crate) fn tree_scratch_stride_bytes(&self) -> u64 {
self.tree_scratch_stride_bytes
}
#[must_use]
pub fn has_tree_digest(&self) -> bool {
self.tree_chunk_size != 0 && !self.d_tree_leaves.is_null() && !self.d_tree_scratch.is_null()
}
#[must_use]
pub fn last_d64_stage_root_digests(&self) -> Option<[[u8; 32]; 4]> {
let pinned = self.stage_digests_pinned.as_ref()?;
let bytes = pinned.as_slice();
if bytes.len() < 4 * 32 {
return None;
}
let mut out = [[0u8; 32]; 4];
for (i, slot) in out.iter_mut().enumerate() {
slot.copy_from_slice(&bytes[i * 32..(i + 1) * 32]);
}
Some(out)
}
#[must_use = "the captured arena bytes are the test's load-bearing input"]
#[allow(unsafe_code)]
pub fn last_d64_detector_wide_arena_bytes(&self) -> Option<Result<Vec<u8>, GpuError>> {
if self.d_detectors_wide.is_null() {
return None;
}
let cell_size_bytes: usize =
std::mem::size_of::<dsfb_gpu_debug_core::detector::DetectorCellWide>();
let n_cells = (self.n_entities as usize) * (self.n_windows as usize);
let total_bytes = n_cells * cell_size_bytes;
let mut host = vec![0u8; total_bytes];
let status: c_int = unsafe {
ffi::dsfb_gpu_memcpy_d2h_bytes(
self.d_detectors_wide.cast::<u8>(),
host.as_mut_ptr(),
total_bytes as u64,
)
};
if status != 0 {
return Some(Err(GpuError::KernelFailed(status)));
}
Some(Ok(host))
}
#[must_use = "the captured arena bytes are the test's load-bearing input"]
#[allow(unsafe_code)]
pub fn last_d64_consensus_arena_bytes(&self) -> Option<Result<Vec<u8>, GpuError>> {
if self.d_consensus.is_null() {
return None;
}
let cell_size_bytes: usize =
std::mem::size_of::<dsfb_gpu_debug_core::consensus::ConsensusCell>();
let n_cells = (self.n_entities as usize) * (self.n_windows as usize);
let total_bytes = n_cells * cell_size_bytes;
let mut host = vec![0u8; total_bytes];
let status: c_int = unsafe {
ffi::dsfb_gpu_memcpy_d2h_bytes(
self.d_consensus.cast::<u8>(),
host.as_mut_ptr(),
total_bytes as u64,
)
};
if status != 0 {
return Some(Err(GpuError::KernelFailed(status)));
}
Some(Ok(host))
}
#[must_use = "the captured arena bytes are the test's load-bearing input"]
#[allow(unsafe_code)]
pub fn last_d64_axis5_grid_sum_bytes(&self) -> Option<Result<Vec<u8>, GpuError>> {
if self.d_axis5_grid_sum.is_null() {
return None;
}
let total_bytes = (self.n_windows as usize) * 8;
let mut host = vec![0u8; total_bytes];
let status: c_int = unsafe {
ffi::dsfb_gpu_memcpy_d2h_bytes(
self.d_axis5_grid_sum,
host.as_mut_ptr(),
total_bytes as u64,
)
};
if status != 0 {
return Some(Err(GpuError::KernelFailed(status)));
}
Some(Ok(host))
}
#[must_use = "the captured arena bytes are the test's load-bearing input"]
#[allow(unsafe_code)]
pub fn last_d64_candidate_fired_arena_bytes(&self) -> Option<Result<Vec<u8>, GpuError>> {
if self.d_candidate_fired.is_null() {
return None;
}
let n_cells = (self.n_entities as usize) * (self.n_windows as usize);
let total_bytes = n_cells;
let mut host = vec![0u8; total_bytes];
let status: c_int = unsafe {
ffi::dsfb_gpu_memcpy_d2h_bytes(
self.d_candidate_fired,
host.as_mut_ptr(),
total_bytes as u64,
)
};
if status != 0 {
return Some(Err(GpuError::KernelFailed(status)));
}
Some(Ok(host))
}
#[must_use = "the captured arena bytes are the test's load-bearing input"]
#[allow(unsafe_code)]
pub fn last_d64_candidates_arena_bytes(&self) -> Option<Result<Vec<u8>, GpuError>> {
if self.d_candidates.is_null() {
return None;
}
let n_slots = (self.n_entities as usize) * (MAX_CANDIDATES_PER_ENTITY as usize);
let total_bytes = n_slots * core::mem::size_of::<CandidateInterval>();
let mut host = vec![0u8; total_bytes];
let status: c_int = unsafe {
ffi::dsfb_gpu_memcpy_d2h_bytes(
self.d_candidates.cast::<u8>(),
host.as_mut_ptr(),
total_bytes as u64,
)
};
if status != 0 {
return Some(Err(GpuError::KernelFailed(status)));
}
Some(Ok(host))
}
#[must_use = "the captured arena bytes are the test's load-bearing input"]
#[allow(unsafe_code)]
pub fn last_d64_candidate_count_arena_bytes(&self) -> Option<Result<Vec<u8>, GpuError>> {
if self.d_candidate_count.is_null() {
return None;
}
let n_entries = self.n_entities as usize;
let total_bytes = n_entries * core::mem::size_of::<i32>();
let mut host = vec![0u8; total_bytes];
let status: c_int = unsafe {
ffi::dsfb_gpu_memcpy_d2h_bytes(
self.d_candidate_count.cast::<u8>(),
host.as_mut_ptr(),
total_bytes as u64,
)
};
if status != 0 {
return Some(Err(GpuError::KernelFailed(status)));
}
Some(Ok(host))
}
#[must_use = "the captured arena bytes are the test's load-bearing input"]
#[allow(unsafe_code)]
pub fn last_d64_detector_compact_pack_arena_bytes(&self) -> Option<Result<Vec<u8>, GpuError>> {
const COMPACT_BYTES_PER_CELL: usize = 18;
if self.d_detector_digest_compact.is_null() {
return None;
}
let n_cells = (self.n_entities as usize) * (self.n_windows as usize);
let total_bytes = n_cells * COMPACT_BYTES_PER_CELL;
let mut host = vec![0u8; total_bytes];
let status: c_int = unsafe {
ffi::dsfb_gpu_memcpy_d2h_bytes(
self.d_detector_digest_compact,
host.as_mut_ptr(),
total_bytes as u64,
)
};
if status != 0 {
return Some(Err(GpuError::KernelFailed(status)));
}
Some(Ok(host))
}
pub(crate) fn d_detectors_wide(&self) -> *mut dsfb_gpu_debug_core::detector::DetectorCellWide {
self.d_detectors_wide
}
pub fn ensure_wide_detector_buffer(&mut self) -> Result<(), GpuError> {
if !self.d_detectors_wide.is_null() {
return Ok(());
}
let n_cells = u64::from(self.n_entities) * u64::from(self.n_windows);
let bytes = n_cells
* (core::mem::size_of::<dsfb_gpu_debug_core::detector::DetectorCellWide>() as u64);
let mut ptr: *mut u8 = std::ptr::null_mut();
#[allow(unsafe_code)]
let status: c_int =
unsafe { ffi::dsfb_gpu_alloc_bytes(bytes, std::ptr::from_mut(&mut ptr)) };
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
#[allow(unsafe_code)]
let memset_status: c_int = unsafe { ffi::dsfb_gpu_memset_bytes(ptr, 0, bytes) };
if memset_status != 0 {
#[allow(unsafe_code)]
unsafe {
ffi::dsfb_gpu_free_bytes(ptr);
}
return Err(GpuError::KernelFailed(memset_status));
}
#[allow(clippy::cast_ptr_alignment)]
let typed = ptr.cast::<dsfb_gpu_debug_core::detector::DetectorCellWide>();
self.d_detectors_wide = typed;
Ok(())
}
#[must_use]
pub fn has_wide_detector_buffer(&self) -> bool {
!self.d_detectors_wide.is_null()
}
pub(crate) fn d_axis5_grid_sum(&self) -> *mut u8 {
self.d_axis5_grid_sum
}
pub fn ensure_axis5_grid_sum_buffer(&mut self) -> Result<(), GpuError> {
if !self.d_axis5_grid_sum.is_null() {
return Ok(());
}
let bytes = u64::from(self.n_windows) * 8;
let mut ptr: *mut u8 = std::ptr::null_mut();
#[allow(unsafe_code)]
let status: c_int =
unsafe { ffi::dsfb_gpu_alloc_bytes(bytes, std::ptr::from_mut(&mut ptr)) };
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
self.d_axis5_grid_sum = ptr;
Ok(())
}
#[must_use]
pub fn has_axis5_grid_sum_buffer(&self) -> bool {
!self.d_axis5_grid_sum.is_null()
}
pub(crate) fn d_drift_buffer(&self) -> *mut i32 {
self.d_drift_buffer
}
pub fn ensure_drift_buffer(&mut self) -> Result<(), GpuError> {
if !self.d_drift_buffer.is_null() {
return Ok(());
}
let bytes = u64::from(self.n_entities) * u64::from(self.n_windows) * 4;
let mut ptr: *mut u8 = std::ptr::null_mut();
#[allow(unsafe_code)]
let status: c_int =
unsafe { ffi::dsfb_gpu_alloc_bytes(bytes, std::ptr::from_mut(&mut ptr)) };
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
#[allow(clippy::cast_ptr_alignment)]
let typed = ptr.cast::<i32>();
self.d_drift_buffer = typed;
Ok(())
}
#[must_use]
pub fn has_drift_buffer(&self) -> bool {
!self.d_drift_buffer.is_null()
}
pub(crate) fn d_detector_digest_compact(&self) -> *mut u8 {
self.d_detector_digest_compact
}
pub(crate) const DETECTOR_WIDE_DIGEST_COMPACT_V1_BYTES: u64 = 18;
pub fn ensure_detector_digest_compact_buffer(&mut self) -> Result<(), GpuError> {
if !self.d_detector_digest_compact.is_null() {
return Ok(());
}
let n_cells = u64::from(self.n_entities) * u64::from(self.n_windows);
let bytes = n_cells * Self::DETECTOR_WIDE_DIGEST_COMPACT_V1_BYTES;
let mut ptr: *mut u8 = std::ptr::null_mut();
#[allow(unsafe_code)]
let status: c_int =
unsafe { ffi::dsfb_gpu_alloc_bytes(bytes, std::ptr::from_mut(&mut ptr)) };
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
self.d_detector_digest_compact = ptr;
Ok(())
}
#[must_use]
pub fn has_detector_digest_compact_buffer(&self) -> bool {
!self.d_detector_digest_compact.is_null()
}
pub(crate) fn d_candidate_fired(&self) -> *mut u8 {
self.d_candidate_fired
}
pub(crate) fn d_candidate_boundaries(&self) -> *mut u8 {
self.d_candidate_boundaries
}
pub fn ensure_candidate_parallel_buffers(&mut self) -> Result<(), GpuError> {
if !self.d_candidate_fired.is_null() && !self.d_candidate_boundaries.is_null() {
return Ok(());
}
let n_cells = u64::from(self.n_entities) * u64::from(self.n_windows);
if self.d_candidate_fired.is_null() {
let mut ptr: *mut u8 = std::ptr::null_mut();
#[allow(unsafe_code)]
let status: c_int =
unsafe { ffi::dsfb_gpu_alloc_bytes(n_cells, std::ptr::from_mut(&mut ptr)) };
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
self.d_candidate_fired = ptr;
}
if self.d_candidate_boundaries.is_null() {
let slots = u64::from(self.n_entities) * (MAX_CANDIDATES_PER_ENTITY as u64);
let bytes = slots * 8;
let mut ptr: *mut u8 = std::ptr::null_mut();
#[allow(unsafe_code)]
let status: c_int =
unsafe { ffi::dsfb_gpu_alloc_bytes(bytes, std::ptr::from_mut(&mut ptr)) };
if status != 0 {
#[allow(unsafe_code)]
unsafe {
let _ = ffi::dsfb_gpu_free_bytes(self.d_candidate_fired);
}
self.d_candidate_fired = std::ptr::null_mut();
return Err(GpuError::KernelFailed(status));
}
self.d_candidate_boundaries = ptr;
}
Ok(())
}
#[must_use]
pub fn has_candidate_parallel_buffers(&self) -> bool {
!self.d_candidate_fired.is_null() && !self.d_candidate_boundaries.is_null()
}
pub(crate) fn d_candidate_run_buffer(&self) -> *mut u8 {
self.d_candidate_run_buffer
}
pub(crate) fn d_candidate_run_count(&self) -> *mut i32 {
self.d_candidate_run_count
}
pub fn ensure_candidate_run_buffer(&mut self) -> Result<(), GpuError> {
if !self.d_candidate_run_buffer.is_null() && !self.d_candidate_run_count.is_null() {
return Ok(());
}
if self.d_candidate_run_buffer.is_null() {
let slots = u64::from(self.n_entities) * (MAX_CANDIDATES_PER_ENTITY as u64);
let bytes = slots * 8;
let mut ptr: *mut u8 = std::ptr::null_mut();
#[allow(unsafe_code)]
let status: c_int =
unsafe { ffi::dsfb_gpu_alloc_bytes(bytes, std::ptr::from_mut(&mut ptr)) };
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
self.d_candidate_run_buffer = ptr;
}
if self.d_candidate_run_count.is_null() {
let bytes = u64::from(self.n_entities) * 4;
let mut ptr: *mut u8 = std::ptr::null_mut();
#[allow(unsafe_code)]
let status: c_int =
unsafe { ffi::dsfb_gpu_alloc_bytes(bytes, std::ptr::from_mut(&mut ptr)) };
if status != 0 {
#[allow(unsafe_code)]
unsafe {
let _ = ffi::dsfb_gpu_free_bytes(self.d_candidate_run_buffer);
}
self.d_candidate_run_buffer = std::ptr::null_mut();
return Err(GpuError::KernelFailed(status));
}
#[allow(clippy::cast_ptr_alignment)]
let typed = ptr.cast::<i32>();
self.d_candidate_run_count = typed;
}
Ok(())
}
#[must_use]
pub fn has_candidate_run_buffer(&self) -> bool {
!self.d_candidate_run_buffer.is_null() && !self.d_candidate_run_count.is_null()
}
pub(crate) fn d_events(&self) -> *mut GpuTraceEventCompact {
self.d_events
}
pub fn ensure_events_buffer(&mut self, n_events: u64) -> Result<(), GpuError> {
if !self.d_events.is_null() && self.d_events_capacity >= n_events {
return Ok(());
}
if !self.d_events.is_null() {
#[allow(unsafe_code)]
unsafe {
let _ = ffi::dsfb_gpu_free_bytes(self.d_events.cast::<u8>());
}
self.d_events = std::ptr::null_mut();
self.d_events_capacity = 0;
}
let bytes = n_events * (core::mem::size_of::<GpuTraceEventCompact>() as u64);
let mut ptr: *mut u8 = std::ptr::null_mut();
#[allow(unsafe_code)]
let status: c_int =
unsafe { ffi::dsfb_gpu_alloc_bytes(bytes, std::ptr::from_mut(&mut ptr)) };
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
#[allow(clippy::cast_ptr_alignment)]
let typed = ptr.cast::<GpuTraceEventCompact>();
self.d_events = typed;
self.d_events_capacity = n_events;
self.events_pinned = Some(crate::pinned::PinnedHostBuf::new(n_events as usize)?);
Ok(())
}
#[must_use]
pub fn has_events_buffer(&self) -> bool {
!self.d_events.is_null() && self.events_pinned.is_some()
}
#[must_use]
pub fn has_const_thresholds(&self) -> bool {
self.const_thresholds_uploaded
}
#[doc(hidden)]
pub fn force_const_thresholds_uploaded_for_test(&mut self, value: bool) {
self.const_thresholds_uploaded = value;
}
#[must_use]
pub fn has_graph(&self) -> bool {
self.graph_exec != 0
}
#[must_use]
pub fn graph_plan_hash(&self) -> Option<[u8; 32]> {
self.graph_plan_hash
}
#[allow(clippy::expect_used, clippy::too_many_lines)]
pub fn try_capture_throughput_graph(
&mut self,
contract: &Contract,
) -> Result<GraphCaptureStatus, GpuError> {
self.assert_compatible(contract)?;
if !self.has_pinned_async() {
return Err(GpuError::InvalidInput(
"GpuWorkspace was not built with pinned shadows + stream; \
call GpuWorkspace::new_with_pinned_async(contract)",
));
}
if self.graph_exec != 0 {
if let Some(plan_hash) = self.graph_plan_hash {
return Ok(GraphCaptureStatus::Captured { plan_hash });
}
}
let uses_const = self.const_thresholds_uploaded;
let plan_hash = compute_throughput_graph_plan_hash(contract, uses_const);
let thresholds_ffi = DetectorThresholdsFfi::from(&DetectorThresholds::CANONICAL);
let baseline = Baseline::CANONICAL;
let candidate_cfg = CandidateConfig::CANONICAL;
let h_features_ptr = self
.features_pinned
.as_ref()
.expect("has_pinned_async guarantees features_pinned is Some")
.as_ptr();
let h_candidates_ptr = self
.candidates_pinned
.as_mut()
.expect("has_pinned_async guarantees candidates_pinned is Some")
.as_mut_ptr();
let h_count_ptr = self
.candidate_count_pinned
.as_mut()
.expect("has_pinned_async guarantees candidate_count_pinned is Some")
.as_mut_ptr();
let h_digests_ptr = self
.stage_digests_pinned
.as_mut()
.expect("has_pinned_async guarantees stage_digests_pinned is Some")
.as_mut_ptr();
let n_entities = self.n_entities as i32;
let n_windows = self.n_windows as i32;
let stream = self.stream;
let mut graph_exec: u64 = 0;
#[allow(unsafe_code)]
let status: c_int = unsafe {
ffi::dsfb_gpu_try_capture_throughput_graph(
std::ptr::from_mut(&mut graph_exec),
self.d_features,
self.d_residuals,
self.d_signs,
self.d_detectors,
self.d_consensus,
self.d_candidates,
self.d_candidate_count,
self.d_stage_digests,
h_features_ptr,
n_entities,
n_windows,
contract.ewma_alpha_q16_raw,
baseline.latency_us,
baseline.error_rate_q16_raw,
std::ptr::from_ref(&thresholds_ffi),
candidate_cfg.min_detector_count as i32,
candidate_cfg.min_residual_q_raw,
candidate_cfg.min_length_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
h_candidates_ptr,
h_count_ptr,
h_digests_ptr,
stream,
c_int::from(uses_const),
)
};
if status != 0 || graph_exec == 0 {
return Ok(GraphCaptureStatus::Demoted {
reason: format!("cuda graph capture refused (cudaError {status})"),
});
}
self.graph_exec = graph_exec;
self.graph_plan_hash = Some(plan_hash);
Ok(GraphCaptureStatus::Captured { plan_hash })
}
pub(crate) fn graph_exec(&self) -> u64 {
self.graph_exec
}
#[must_use]
pub fn consensus(&self) -> &[ConsensusCell] {
&self.consensus
}
#[must_use]
pub fn consensus_mut(&mut self) -> &mut [ConsensusCell] {
&mut self.consensus
}
}
pub struct BatchedGpuWorkspace {
d_features: *mut WindowFeature,
d_residuals: *mut ResidualCell,
d_signs: *mut SignCell,
d_detectors: *mut DetectorCell,
d_consensus: *mut ConsensusCell,
d_candidates: *mut CandidateInterval,
d_candidate_count: *mut i32,
d_stage_digests: *mut u8,
pub(crate) features: Vec<WindowFeature>,
pub(crate) residuals: Vec<ResidualCell>,
pub(crate) signs: Vec<SignCell>,
pub(crate) detectors: Vec<DetectorCell>,
pub(crate) consensus: Vec<ConsensusCell>,
pub(crate) candidate_buf: Vec<CandidateInterval>,
pub(crate) candidate_count: Vec<i32>,
pub n_catalogs: u32,
pub n_entities: u32,
pub n_windows: u32,
}
impl BatchedGpuWorkspace {
pub fn new(n_catalogs: u32, contract: &Contract) -> Result<Self, GpuError> {
let per_catalog: usize = (contract.n_entities as usize) * (contract.n_windows as usize);
let total: usize = (n_catalogs as usize) * per_catalog;
let candidate_capacity_per_catalog: usize =
(contract.n_entities as usize) * (MAX_CANDIDATES_PER_ENTITY as usize);
let candidate_total: usize = (n_catalogs as usize) * candidate_capacity_per_catalog;
let mut d_features: *mut WindowFeature = std::ptr::null_mut();
let mut d_residuals: *mut ResidualCell = std::ptr::null_mut();
let mut d_signs: *mut SignCell = std::ptr::null_mut();
let mut d_detectors: *mut DetectorCell = std::ptr::null_mut();
let mut d_consensus: *mut ConsensusCell = std::ptr::null_mut();
let mut d_candidates: *mut CandidateInterval = std::ptr::null_mut();
let mut d_candidate_count: *mut i32 = std::ptr::null_mut();
#[allow(unsafe_code)]
let status: c_int = unsafe {
ffi::dsfb_gpu_workspace_alloc_batched(
n_catalogs as i32,
contract.n_entities as i32,
contract.n_windows as i32,
MAX_CANDIDATES_PER_ENTITY,
std::ptr::from_mut(&mut d_features),
std::ptr::from_mut(&mut d_residuals),
std::ptr::from_mut(&mut d_signs),
std::ptr::from_mut(&mut d_detectors),
std::ptr::from_mut(&mut d_consensus),
std::ptr::from_mut(&mut d_candidates),
std::ptr::from_mut(&mut d_candidate_count),
)
};
if status != 0 {
return Err(GpuError::KernelFailed(status));
}
let mut d_stage_digests: *mut u8 = std::ptr::null_mut();
#[allow(unsafe_code)]
let dig_status: c_int = unsafe {
ffi::dsfb_gpu_alloc_bytes(
4 * 32 * u64::from(n_catalogs),
std::ptr::from_mut(&mut d_stage_digests),
)
};
if dig_status != 0 {
#[allow(unsafe_code)]
unsafe {
ffi::dsfb_gpu_workspace_free(
d_features,
d_residuals,
d_signs,
d_detectors,
d_consensus,
d_candidates,
d_candidate_count,
);
}
return Err(GpuError::KernelFailed(dig_status));
}
Ok(Self {
d_features,
d_residuals,
d_signs,
d_detectors,
d_consensus,
d_candidates,
d_candidate_count,
d_stage_digests,
features: vec![WindowFeature::default(); total],
residuals: vec![ResidualCell::default(); total],
signs: vec![SignCell::default(); total],
detectors: vec![DetectorCell::default(); total],
consensus: vec![ConsensusCell::default(); total],
candidate_buf: vec![CandidateInterval::default(); candidate_total],
candidate_count: vec![0i32; (n_catalogs as usize) * (contract.n_entities as usize)],
n_catalogs,
n_entities: contract.n_entities,
n_windows: contract.n_windows,
})
}
#[must_use]
pub const fn per_catalog_cells(&self) -> usize {
(self.n_entities as usize) * (self.n_windows as usize)
}
#[must_use]
pub const fn per_catalog_candidate_slots(&self) -> usize {
(self.n_entities as usize) * (MAX_CANDIDATES_PER_ENTITY as usize)
}
pub(crate) fn d_features(&self) -> *mut WindowFeature {
self.d_features
}
pub(crate) fn d_residuals(&self) -> *mut ResidualCell {
self.d_residuals
}
pub(crate) fn d_signs(&self) -> *mut SignCell {
self.d_signs
}
pub(crate) fn d_detectors(&self) -> *mut DetectorCell {
self.d_detectors
}
pub(crate) fn d_consensus(&self) -> *mut ConsensusCell {
self.d_consensus
}
pub(crate) fn d_candidates(&self) -> *mut CandidateInterval {
self.d_candidates
}
pub(crate) fn d_candidate_count(&self) -> *mut i32 {
self.d_candidate_count
}
pub(crate) fn d_stage_digests(&self) -> *mut u8 {
self.d_stage_digests
}
}
impl Drop for BatchedGpuWorkspace {
fn drop(&mut self) {
#[allow(unsafe_code)]
unsafe {
ffi::dsfb_gpu_workspace_free(
self.d_features,
self.d_residuals,
self.d_signs,
self.d_detectors,
self.d_consensus,
self.d_candidates,
self.d_candidate_count,
);
ffi::dsfb_gpu_free_bytes(self.d_stage_digests);
}
}
}
impl Drop for GpuWorkspace {
fn drop(&mut self) {
#[allow(unsafe_code)]
unsafe {
ffi::dsfb_gpu_workspace_free(
self.d_features,
self.d_residuals,
self.d_signs,
self.d_detectors,
self.d_consensus,
self.d_candidates,
self.d_candidate_count,
);
ffi::dsfb_gpu_free_bytes(self.d_stage_digests);
ffi::dsfb_gpu_free_bytes(self.d_drifts);
ffi::dsfb_gpu_free_bytes(self.d_tree_leaves);
ffi::dsfb_gpu_free_bytes(self.d_tree_scratch);
ffi::dsfb_gpu_free_bytes(self.d_detectors_wide.cast::<u8>());
ffi::dsfb_gpu_free_bytes(self.d_axis5_grid_sum);
ffi::dsfb_gpu_free_bytes(self.d_drift_buffer.cast::<u8>());
ffi::dsfb_gpu_free_bytes(self.d_detector_digest_compact);
ffi::dsfb_gpu_free_bytes(self.d_candidate_fired);
ffi::dsfb_gpu_free_bytes(self.d_candidate_boundaries);
ffi::dsfb_gpu_free_bytes(self.d_candidate_run_buffer);
ffi::dsfb_gpu_free_bytes(self.d_candidate_run_count.cast::<u8>());
ffi::dsfb_gpu_free_bytes(self.d_events.cast::<u8>());
if self.graph_exec != 0 {
let _ = ffi::dsfb_gpu_destroy_throughput_graph(self.graph_exec);
}
if self.stream != 0 {
let _ = ffi::dsfb_gpu_destroy_stream(self.stream);
}
}
}
}