use ndarray::{ArrayView1, ArrayView2, Axis};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ScoreRoutePath {
Device,
Cpu,
}
#[derive(Clone, Debug)]
pub struct ScoreRouteResult {
pub selections: Vec<Vec<(u32, f32)>>,
pub path: ScoreRoutePath,
pub plan: gam_gpu::DictionaryScoreRoutePlan,
pub device_dtoh_bytes: usize,
pub unfused_score_dtoh_bytes: usize,
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct ScoreRouteStats {
pub minibatches: usize,
pub admitted_minibatches: usize,
pub device_minibatches: usize,
pub cpu_minibatches: usize,
pub score_elements: u128,
pub score_tiles: usize,
pub peak_score_bytes: usize,
pub dot_flops_lower_bound: u128,
pub device_dtoh_bytes: u128,
pub unfused_score_dtoh_bytes_avoided: u128,
}
impl ScoreRouteStats {
pub fn record(&mut self, plan: gam_gpu::DictionaryScoreRoutePlan, path: ScoreRoutePath) {
self.record_with_device_transfer(plan, path, 0, 0);
}
pub fn record_result(&mut self, result: &ScoreRouteResult) {
self.record_with_device_transfer(
result.plan,
result.path,
result.device_dtoh_bytes,
result.unfused_score_dtoh_bytes,
);
}
fn record_with_device_transfer(
&mut self,
plan: gam_gpu::DictionaryScoreRoutePlan,
path: ScoreRoutePath,
device_dtoh_bytes: usize,
unfused_score_dtoh_bytes: usize,
) {
self.minibatches += 1;
if plan.device_admitted {
self.admitted_minibatches += 1;
}
match path {
ScoreRoutePath::Device => self.device_minibatches += 1,
ScoreRoutePath::Cpu => self.cpu_minibatches += 1,
}
self.score_elements = self
.score_elements
.saturating_add((plan.n_rows as u128).saturating_mul(plan.n_items as u128));
self.score_tiles = self.score_tiles.saturating_add(plan.tile_count);
self.peak_score_bytes = self.peak_score_bytes.max(plan.peak_score_bytes);
self.dot_flops_lower_bound = self
.dot_flops_lower_bound
.saturating_add(plan.dot_flops_lower_bound);
self.device_dtoh_bytes = self
.device_dtoh_bytes
.saturating_add(device_dtoh_bytes as u128);
self.unfused_score_dtoh_bytes_avoided =
self.unfused_score_dtoh_bytes_avoided.saturating_add(
(unfused_score_dtoh_bytes as u128).saturating_sub(device_dtoh_bytes as u128),
);
}
pub fn absorb(&mut self, other: Self) {
self.minibatches = self.minibatches.saturating_add(other.minibatches);
self.admitted_minibatches = self
.admitted_minibatches
.saturating_add(other.admitted_minibatches);
self.device_minibatches = self
.device_minibatches
.saturating_add(other.device_minibatches);
self.cpu_minibatches = self.cpu_minibatches.saturating_add(other.cpu_minibatches);
self.score_elements = self.score_elements.saturating_add(other.score_elements);
self.score_tiles = self.score_tiles.saturating_add(other.score_tiles);
self.peak_score_bytes = self.peak_score_bytes.max(other.peak_score_bytes);
self.dot_flops_lower_bound = self
.dot_flops_lower_bound
.saturating_add(other.dot_flops_lower_bound);
self.device_dtoh_bytes = self
.device_dtoh_bytes
.saturating_add(other.device_dtoh_bytes);
self.unfused_score_dtoh_bytes_avoided = self
.unfused_score_dtoh_bytes_avoided
.saturating_add(other.unfused_score_dtoh_bytes_avoided);
}
}
#[derive(Clone, Debug)]
pub struct TopSSelector {
heap: Vec<(u32, f32, f32)>,
capacity: usize,
worst_idx: usize,
}
impl TopSSelector {
pub fn new(capacity: usize) -> Self {
Self {
heap: Vec::with_capacity(capacity.max(1)),
capacity: capacity.max(1),
worst_idx: 0,
}
}
#[inline]
fn recompute_worst(&mut self) {
let mut worst = 0usize;
for k in 1..self.heap.len() {
if self.heap[k].2 < self.heap[worst].2
|| (self.heap[k].2 == self.heap[worst].2 && self.heap[k].0 > self.heap[worst].0)
{
worst = k;
}
}
self.worst_idx = worst;
}
#[inline]
pub fn offer(&mut self, atom: u32, score: f32) {
let mag = score.abs();
if self.heap.len() < self.capacity {
self.heap.push((atom, score, mag));
if self.heap.len() == self.capacity {
self.recompute_worst();
}
return;
}
let (w_atom, _, w_mag) = self.heap[self.worst_idx];
if mag > w_mag || (mag == w_mag && atom < w_atom) {
self.heap[self.worst_idx] = (atom, score, mag);
self.recompute_worst();
}
}
pub fn finish(mut self) -> Vec<(u32, f32)> {
self.heap.sort_by(|a, b| {
b.2.partial_cmp(&a.2)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| a.0.cmp(&b.0))
});
self.heap.into_iter().map(|(a, s, _)| (a, s)).collect()
}
}
#[inline]
pub fn score_row_tile(
row: ArrayView1<'_, f32>,
atoms_tile: ArrayView2<'_, f32>,
atom_offset: usize,
sel: &mut TopSSelector,
) {
let p = row.len();
for (local, atom) in atoms_tile.outer_iter().enumerate() {
let mut acc = 0.0f32;
for c in 0..p {
acc += row[c] * atom[c];
}
sel.offer((atom_offset + local) as u32, acc);
}
}
pub fn top_s_online(
row: ArrayView1<'_, f32>,
decoder: ArrayView2<'_, f32>,
s: usize,
tile: usize,
) -> Vec<(u32, f32)> {
let k = decoder.nrows();
let tile = tile.max(1);
let mut sel = TopSSelector::new(s);
let mut start = 0usize;
while start < k {
let end = (start + tile).min(k);
let block = decoder.slice(ndarray::s![start..end, ..]);
score_row_tile(row, block, start, &mut sel);
start = end;
}
sel.finish()
}
#[derive(Clone, Copy, Debug)]
pub struct TileScorer {
pub tile: usize,
pub active: usize,
}
impl TileScorer {
pub fn new(active: usize, tile: usize) -> Self {
Self {
tile: tile.max(1),
active: active.max(1),
}
}
pub fn route_row(
&self,
row: ArrayView1<'_, f32>,
decoder: ArrayView2<'_, f32>,
) -> Vec<(u32, f32)> {
top_s_online(row, decoder, self.active, self.tile)
}
pub fn route_minibatch(
&self,
rows: ArrayView2<'_, f32>,
decoder: ArrayView2<'_, f32>,
) -> Vec<Vec<(u32, f32)>> {
let b = rows.nrows();
let k = decoder.nrows();
let mut selectors: Vec<TopSSelector> =
(0..b).map(|_| TopSSelector::new(self.active)).collect();
let mut start = 0usize;
while start < k {
let end = (start + self.tile).min(k);
let tile_block = decoder.slice(ndarray::s![start..end, ..]);
let scores = rows.dot(&tile_block.t()); for (local, score_col) in scores.axis_iter(Axis(1)).enumerate() {
let atom = (start + local) as u32;
for (row_idx, &sc) in score_col.iter().enumerate() {
selectors[row_idx].offer(atom, sc);
}
}
start = end;
}
selectors.into_iter().map(TopSSelector::finish).collect()
}
pub fn route_minibatch_dispatch(
&self,
rows: ArrayView2<'_, f32>,
decoder: ArrayView2<'_, f32>,
) -> Result<Vec<Vec<(u32, f32)>>, String> {
Ok(self
.route_minibatch_with_mode(rows, decoder, gam_gpu::gpu_mode())?
.selections)
}
pub fn route_minibatch_with_mode(
&self,
rows: ArrayView2<'_, f32>,
decoder: ArrayView2<'_, f32>,
mode: gam_gpu::GpuMode,
) -> Result<ScoreRouteResult, String> {
let plan = gam_gpu::DictionaryScoreRoutePlan::default_for_shape(
rows.nrows(),
decoder.nrows(),
decoder.ncols(),
);
if mode == gam_gpu::GpuMode::Off {
return Ok(ScoreRouteResult {
selections: self.route_minibatch(rows, decoder),
path: ScoreRoutePath::Cpu,
plan,
device_dtoh_bytes: 0,
unfused_score_dtoh_bytes: 0,
});
}
if mode == gam_gpu::GpuMode::Required && !plan.device_admitted {
return Err(format!(
"sparse_dict route_minibatch GpuMode::Required: block of {}x{} = {} elems is \
below the device launch break-even (DEVICE_SCORE_BLOCK_MIN_ELEMS={}); refusing \
to silently run on the CPU",
plan.n_rows,
plan.n_items,
plan.n_rows.saturating_mul(plan.n_items),
plan.device_min_score_elems
));
}
#[cfg(target_os = "linux")]
{
if mode == gam_gpu::GpuMode::Required || plan.device_admitted {
match super::scoring_gpu::route_minibatch_required(
rows,
decoder,
self.active,
self.tile,
mode,
) {
Ok((routed, super::scoring_gpu::ScoreBlockPath::Device, dtoh_bytes)) => {
return Ok(ScoreRouteResult {
selections: routed,
path: ScoreRoutePath::Device,
plan,
device_dtoh_bytes: dtoh_bytes,
unfused_score_dtoh_bytes: plan
.n_rows
.saturating_mul(plan.n_items)
.saturating_mul(std::mem::size_of::<f32>()),
});
}
Ok((routed, super::scoring_gpu::ScoreBlockPath::Cpu, _)) => {
if mode == gam_gpu::GpuMode::Required {
return Err(
"sparse_dict route_minibatch Required mode returned CPU path"
.to_string(),
);
}
return Ok(ScoreRouteResult {
selections: routed,
path: ScoreRoutePath::Cpu,
plan,
device_dtoh_bytes: 0,
unfused_score_dtoh_bytes: 0,
});
}
Err(err) => {
if mode == gam_gpu::GpuMode::Required {
return Err(err.to_string());
}
}
}
}
}
#[cfg(not(target_os = "linux"))]
{
if mode == gam_gpu::GpuMode::Required {
return Err(
"sparse_dict route_minibatch GpuMode::Required: CUDA routing is only compiled \
on Linux"
.to_string(),
);
}
}
Ok(ScoreRouteResult {
selections: self.route_minibatch(rows, decoder),
path: ScoreRoutePath::Cpu,
plan,
device_dtoh_bytes: 0,
unfused_score_dtoh_bytes: 0,
})
}
}