use rttp::PulseFrameHeader;
use rayon::prelude::*;
const RPKI_HEADER_SIZE: usize = 64;
const QUARANTINE_THRESHOLD: f32 = 0.95;
const RTTP_MAGIC: u32 = 0x5254_5450;
#[repr(align(64))]
pub struct ParallelScanResult {
pub identity_ok: bool,
pub watermark_ok: bool,
pub hash_ok: bool,
pub anomaly_score: f32,
pub hive_consensus_ok: bool,
pub reason: u16,
}
impl ParallelScanResult {
pub fn is_safe(&self) -> bool {
self.identity_ok
&& self.watermark_ok
&& self.hash_ok
&& self.anomaly_score < QUARANTINE_THRESHOLD
}
}
pub fn parallel_immune_scan(header: &PulseFrameHeader, payload: &[u8]) -> ParallelScanResult {
let (res_a, res_b) = rayon::join(
|| {
rayon::join(
|| {
crate::dag::MerkleDag::verify_roa_proof(&header.rpki_fingerprint, header.semantic_hash)
},
|| {
let watermark = crate::watermark::extract(payload, &header.rpki_fingerprint);
crate::watermark::verify(watermark, header.timestamp_ns)
}
)
},
|| {
rayon::join(
|| {
header.checksum == crate::crypto::compute_hardware_crc32(header, payload)
},
|| {
crate::anomaly::classify_intent_stream(header)
}
)
}
);
let (identity_ok, watermark_ok) = res_a;
let (hash_ok, (anomaly_detected, score)) = res_b;
let mut result = ParallelScanResult {
identity_ok,
watermark_ok,
hash_ok,
anomaly_score: score,
hive_consensus_ok: true, reason: 0,
};
if !result.is_safe() || anomaly_detected {
result.reason = if !identity_ok { 0x01 } else if !watermark_ok { 0x02 } else { 0x04 };
rttp::emit_quarantine_pulse(&header.rpki_fingerprint, result.reason);
}
result
}
pub fn on_pulse_received(frame: &[u8]) {
if frame.len() < RPKI_HEADER_SIZE {
eprintln!("\x1b[1;31m[RPKI-PATHOGEN]\x1b[0m Frame Underflow detected.");
return;
}
let header = unsafe { &*(frame.as_ptr() as *const PulseFrameHeader) };
if header.magic != RTTP_MAGIC {
return;
}
let payload = &frame[RPKI_HEADER_SIZE..];
let scan_result = parallel_immune_scan(header, payload);
if !scan_result.is_safe() {
return;
}
}