#![allow(missing_docs)]
use crate::delta::TileVertexId;
use crate::evidence::LogEValue;
use core::mem::size_of;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum TileStatus {
Idle = 0,
Processing = 1,
Complete = 2,
Error = 3,
Waiting = 4,
Checkpointing = 5,
Recovering = 6,
Shutdown = 7,
}
impl From<u8> for TileStatus {
fn from(v: u8) -> Self {
match v {
0 => TileStatus::Idle,
1 => TileStatus::Processing,
2 => TileStatus::Complete,
3 => TileStatus::Error,
4 => TileStatus::Waiting,
5 => TileStatus::Checkpointing,
6 => TileStatus::Recovering,
7 => TileStatus::Shutdown,
_ => TileStatus::Error,
}
}
}
#[derive(Debug, Clone, Copy, Default)]
#[repr(C, align(8))]
pub struct WitnessFragment {
pub seed: TileVertexId,
pub boundary_size: u16,
pub cardinality: u16,
pub hash: u16,
pub local_min_cut: u16,
pub component: u16,
pub _reserved: u16,
}
impl WitnessFragment {
#[inline]
pub const fn new(
seed: TileVertexId,
boundary_size: u16,
cardinality: u16,
local_min_cut: u16,
) -> Self {
Self {
seed,
boundary_size,
cardinality,
hash: 0,
local_min_cut,
component: 0,
_reserved: 0,
}
}
pub fn compute_hash(&mut self) {
let mut h = self.seed as u32;
h = h.wrapping_mul(31).wrapping_add(self.boundary_size as u32);
h = h.wrapping_mul(31).wrapping_add(self.cardinality as u32);
h = h.wrapping_mul(31).wrapping_add(self.local_min_cut as u32);
self.hash = (h & 0xFFFF) as u16;
}
#[inline]
pub const fn is_empty(&self) -> bool {
self.cardinality == 0
}
}
#[derive(Debug, Clone, Copy)]
#[repr(C, align(64))]
pub struct TileReport {
pub tile_id: u8,
pub status: TileStatus,
pub generation: u16,
pub tick: u32,
pub num_vertices: u16,
pub num_edges: u16,
pub num_components: u16,
pub graph_flags: u16,
pub log_e_value: LogEValue,
pub obs_count: u16,
pub rejected_count: u16,
pub witness: WitnessFragment,
pub delta_time_us: u16,
pub tick_time_us: u16,
pub deltas_processed: u16,
pub memory_kb: u16,
pub ghost_vertices: u16,
pub ghost_edges: u16,
pub boundary_vertices: u16,
pub pending_sync: u16,
pub _reserved: [u8; 8],
}
impl Default for TileReport {
fn default() -> Self {
Self::new(0)
}
}
impl TileReport {
pub const GRAPH_CONNECTED: u16 = 0x0001;
pub const GRAPH_DIRTY: u16 = 0x0002;
pub const GRAPH_FULL: u16 = 0x0004;
pub const GRAPH_HAS_GHOSTS: u16 = 0x0008;
#[inline]
pub const fn new(tile_id: u8) -> Self {
Self {
tile_id,
status: TileStatus::Idle,
generation: 0,
tick: 0,
num_vertices: 0,
num_edges: 0,
num_components: 0,
graph_flags: 0,
log_e_value: 0,
obs_count: 0,
rejected_count: 0,
witness: WitnessFragment {
seed: 0,
boundary_size: 0,
cardinality: 0,
hash: 0,
local_min_cut: 0,
component: 0,
_reserved: 0,
},
delta_time_us: 0,
tick_time_us: 0,
deltas_processed: 0,
memory_kb: 0,
ghost_vertices: 0,
ghost_edges: 0,
boundary_vertices: 0,
pending_sync: 0,
_reserved: [0; 8],
}
}
#[inline]
pub fn set_complete(&mut self) {
self.status = TileStatus::Complete;
}
#[inline]
pub fn set_error(&mut self) {
self.status = TileStatus::Error;
}
#[inline]
pub fn set_connected(&mut self, connected: bool) {
if connected {
self.graph_flags |= Self::GRAPH_CONNECTED;
} else {
self.graph_flags &= !Self::GRAPH_CONNECTED;
}
}
#[inline]
pub const fn is_connected(&self) -> bool {
self.graph_flags & Self::GRAPH_CONNECTED != 0
}
#[inline]
pub const fn is_dirty(&self) -> bool {
self.graph_flags & Self::GRAPH_DIRTY != 0
}
pub fn e_value_approx(&self) -> f32 {
let log2_val = (self.log_e_value as f32) / 65536.0;
libm::exp2f(log2_val)
}
pub fn set_witness(&mut self, witness: WitnessFragment) {
self.witness = witness;
}
#[inline]
pub const fn get_witness(&self) -> &WitnessFragment {
&self.witness
}
#[inline]
pub const fn has_rejections(&self) -> bool {
self.rejected_count > 0
}
pub fn processing_rate(&self) -> f32 {
if self.tick_time_us == 0 {
0.0
} else {
(self.deltas_processed as f32) / (self.tick_time_us as f32)
}
}
}
#[derive(Debug, Clone, Copy, Default)]
#[repr(C)]
pub struct AggregatedReport {
pub total_vertices: u32,
pub total_edges: u32,
pub total_components: u16,
pub tiles_reporting: u16,
pub tiles_with_errors: u16,
pub tiles_with_rejections: u16,
pub global_log_e: i64,
pub global_min_cut: u16,
pub min_cut_tile: u8,
pub _reserved: u8,
pub total_time_us: u32,
pub tick: u32,
}
impl AggregatedReport {
pub const fn new(tick: u32) -> Self {
Self {
total_vertices: 0,
total_edges: 0,
total_components: 0,
tiles_reporting: 0,
tiles_with_errors: 0,
tiles_with_rejections: 0,
global_log_e: 0,
global_min_cut: u16::MAX,
min_cut_tile: 0,
_reserved: 0,
total_time_us: 0,
tick,
}
}
pub fn merge(&mut self, report: &TileReport) {
self.total_vertices += report.num_vertices as u32;
self.total_edges += report.num_edges as u32;
self.total_components += report.num_components;
self.tiles_reporting += 1;
if report.status == TileStatus::Error {
self.tiles_with_errors += 1;
}
if report.rejected_count > 0 {
self.tiles_with_rejections += 1;
}
self.global_log_e += report.log_e_value as i64;
if report.witness.local_min_cut < self.global_min_cut {
self.global_min_cut = report.witness.local_min_cut;
self.min_cut_tile = report.tile_id;
}
self.total_time_us = self.total_time_us.max(report.tick_time_us as u32);
}
pub fn all_complete(&self, expected_tiles: u16) -> bool {
self.tiles_reporting == expected_tiles && self.tiles_with_errors == 0
}
pub fn global_e_value(&self) -> f64 {
let log2_val = (self.global_log_e as f64) / 65536.0;
libm::exp2(log2_val)
}
}
const _: () = assert!(
size_of::<TileReport>() == 64,
"TileReport must be exactly 64 bytes"
);
const _: () = assert!(
size_of::<WitnessFragment>() == 16,
"WitnessFragment must be 16 bytes"
);
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tile_report_size() {
assert_eq!(size_of::<TileReport>(), 64);
}
#[test]
fn test_tile_report_alignment() {
assert_eq!(core::mem::align_of::<TileReport>(), 64);
}
#[test]
fn test_witness_fragment_size() {
assert_eq!(size_of::<WitnessFragment>(), 16);
}
#[test]
fn test_new_report() {
let report = TileReport::new(5);
assert_eq!(report.tile_id, 5);
assert_eq!(report.status, TileStatus::Idle);
assert_eq!(report.tick, 0);
}
#[test]
fn test_set_status() {
let mut report = TileReport::new(0);
report.set_complete();
assert_eq!(report.status, TileStatus::Complete);
report.set_error();
assert_eq!(report.status, TileStatus::Error);
}
#[test]
fn test_connected_flag() {
let mut report = TileReport::new(0);
assert!(!report.is_connected());
report.set_connected(true);
assert!(report.is_connected());
report.set_connected(false);
assert!(!report.is_connected());
}
#[test]
fn test_witness_fragment() {
let mut frag = WitnessFragment::new(10, 5, 20, 100);
assert_eq!(frag.seed, 10);
assert_eq!(frag.boundary_size, 5);
assert_eq!(frag.cardinality, 20);
assert_eq!(frag.local_min_cut, 100);
frag.compute_hash();
assert_ne!(frag.hash, 0);
}
#[test]
fn test_aggregated_report() {
let mut agg = AggregatedReport::new(1);
let mut report1 = TileReport::new(0);
report1.num_vertices = 50;
report1.num_edges = 100;
report1.witness.local_min_cut = 200;
let mut report2 = TileReport::new(1);
report2.num_vertices = 75;
report2.num_edges = 150;
report2.witness.local_min_cut = 150;
agg.merge(&report1);
agg.merge(&report2);
assert_eq!(agg.tiles_reporting, 2);
assert_eq!(agg.total_vertices, 125);
assert_eq!(agg.total_edges, 250);
assert_eq!(agg.global_min_cut, 150);
assert_eq!(agg.min_cut_tile, 1);
}
#[test]
fn test_tile_status_roundtrip() {
for i in 0..=7 {
let status = TileStatus::from(i);
assert_eq!(status as u8, i);
}
}
#[test]
fn test_processing_rate() {
let mut report = TileReport::new(0);
report.deltas_processed = 100;
report.tick_time_us = 50;
assert!((report.processing_rate() - 2.0).abs() < 0.01);
}
}