use std::collections::{HashMap, HashSet};
use super::{InvariantStatus, OperatorState, StatefulInvariant};
use crate::models::bundle::{SynthesisBundle, SynthesisPayload};
use crate::screening::HazardHit;
use super::InvariantId;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum StatefulInvariantId {
S1,
}
impl StatefulInvariantId {
pub fn as_str(&self) -> &'static str {
match self {
StatefulInvariantId::S1 => "S1",
}
}
}
pub struct FragmentationBypassDetector {
k: usize,
max_window: usize,
max_kmers_per_principal: usize,
similarity_threshold: f64,
windows: HashMap<String, Vec<HashSet<u64>>>,
}
impl Default for FragmentationBypassDetector {
fn default() -> Self {
Self::new(24, 20, 200_000, 0.4)
}
}
impl FragmentationBypassDetector {
pub fn new(
k: usize,
max_window: usize,
max_kmers_per_principal: usize,
similarity_threshold: f64,
) -> Self {
Self {
k,
max_window,
max_kmers_per_principal,
similarity_threshold,
windows: HashMap::new(),
}
}
fn extract_kmers(&self, seq: &str) -> HashSet<u64> {
let bytes = seq.as_bytes();
if bytes.len() < self.k {
return HashSet::new();
}
let mut set = HashSet::new();
for i in 0..=bytes.len() - self.k {
let mut h: u64 = 0xcbf29ce484222325; for &b in &bytes[i..i + self.k] {
h ^= b.to_ascii_uppercase() as u64;
h = h.wrapping_mul(0x100000001b3);
}
set.insert(h);
}
set
}
fn jaccard(a: &HashSet<u64>, b: &HashSet<u64>) -> f64 {
if a.is_empty() && b.is_empty() {
return 0.0;
}
let inter = a.intersection(b).count();
let union = a.union(b).count();
if union == 0 {
0.0
} else {
inter as f64 / union as f64
}
}
fn union_of_window(window: &[HashSet<u64>]) -> HashSet<u64> {
let mut u = HashSet::new();
for s in window {
for &k in s {
u.insert(k);
}
}
u
}
fn total_kmers(window: &[HashSet<u64>]) -> usize {
window.iter().map(|s| s.len()).sum()
}
pub fn evaluate_bundle(
&mut self,
bundle: &SynthesisBundle,
principal: &str,
screening_hits: &[HazardHit],
) -> InvariantStatus {
let seq = match &bundle.payload {
SynthesisPayload::Dna { sequence } => sequence.to_ascii_uppercase(),
_ => return InvariantStatus::Pass,
};
let current_kmers = self.extract_kmers(&seq);
if current_kmers.is_empty() {
return InvariantStatus::Pass;
}
let window = self.windows.entry(principal.to_string()).or_default();
if !window.is_empty() {
let union = Self::union_of_window(window);
let sim = Self::jaccard(¤t_kmers, &union);
if sim > self.similarity_threshold {
if !screening_hits.is_empty() {
let prior_count = window.len();
window.clear();
window.push(current_kmers);
return InvariantStatus::Fail {
reason: format!(
"fragmentation bypass: {sim:.2} Jaccard similarity with \
{prior_count} prior bundles from same principal, \
hazard hits present",
),
};
}
let prior_count = window.len();
window.push(current_kmers);
Self::evict_if_needed(window, self.max_window, self.max_kmers_per_principal);
return InvariantStatus::Advisory {
note: format!(
"high k-mer overlap ({sim:.2}) with {prior_count} prior bundles \
from same principal",
),
};
}
}
window.push(current_kmers);
Self::evict_if_needed(window, self.max_window, self.max_kmers_per_principal);
InvariantStatus::Pass
}
fn evict_if_needed(window: &mut Vec<HashSet<u64>>, max_window: usize, max_kmers: usize) {
while window.len() > max_window {
window.remove(0);
}
while Self::total_kmers(window) > max_kmers && window.len() > 1 {
window.remove(0);
}
}
pub fn principal_count(&self) -> usize {
self.windows.len()
}
pub fn window_len(&self, principal: &str) -> usize {
self.windows.get(principal).map_or(0, |w| w.len())
}
}
pub struct S1FragmentationInvariant;
impl StatefulInvariant for S1FragmentationInvariant {
fn id(&self) -> InvariantId {
InvariantId::D1
}
fn name(&self) -> &'static str {
"s1_fragmentation_bypass"
}
fn evaluate(&self, bundle: &SynthesisBundle, state: &OperatorState) -> InvariantStatus {
let seq = match &bundle.payload {
SynthesisPayload::Dna { sequence } => sequence.to_ascii_uppercase(),
_ => return InvariantStatus::Pass,
};
if seq.len() < 24 {
return InvariantStatus::Pass;
}
if !state.recent_kmers.is_empty() {
return InvariantStatus::Advisory {
note: format!(
"principal {} has {} cached k-mer fingerprints; \
run mutable FragmentationBypassDetector for full analysis",
state.principal,
state.recent_kmers.len()
),
};
}
InvariantStatus::Pass
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::bundle::{BundleAuthority, SynthesisBundle, SynthesisPayload};
fn dna_bundle(seq: &str) -> SynthesisBundle {
SynthesisBundle {
timestamp: chrono::Utc::now(),
source: "test".into(),
sequence: 1,
payload: SynthesisPayload::Dna {
sequence: seq.into(),
},
delta_time: 0.0,
authority: BundleAuthority {
pca_chain: String::new(),
required_ops: vec![],
},
metadata: Default::default(),
}
}
fn fake_hazard_hit() -> HazardHit {
use crate::screening::HazardEntry;
HazardHit {
entry: HazardEntry {
id: "test-hazard".into(),
label: "Test Hazard".into(),
hazard_class: "test".into(),
pattern: "ATGC".into(),
},
matched_text: "ATGC".into(),
}
}
const SEQ_A: &str = "ATGCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGA";
const SEQ_B: &str = "TTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT";
#[test]
fn empty_history_passes() {
let mut det = FragmentationBypassDetector::default();
let bundle = dna_bundle(SEQ_A);
let status = det.evaluate_bundle(&bundle, "alice", &[]);
assert_eq!(status, InvariantStatus::Pass);
assert_eq!(det.window_len("alice"), 1);
}
#[test]
fn single_bundle_below_threshold_passes() {
let mut det = FragmentationBypassDetector::default();
let b1 = dna_bundle(SEQ_A);
let b2 = dna_bundle(SEQ_B);
let s1 = det.evaluate_bundle(&b1, "alice", &[]);
let s2 = det.evaluate_bundle(&b2, "alice", &[]);
assert_eq!(s1, InvariantStatus::Pass);
assert_eq!(s2, InvariantStatus::Pass);
assert_eq!(det.window_len("alice"), 2);
}
#[test]
fn similar_bundles_advisory() {
let mut det = FragmentationBypassDetector::new(4, 20, 200_000, 0.1);
let seq = "ATGCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGA";
let b1 = dna_bundle(&seq[..30]);
let b2 = dna_bundle(&seq[5..35]);
let s1 = det.evaluate_bundle(&b1, "alice", &[]);
let s2 = det.evaluate_bundle(&b2, "alice", &[]);
assert_eq!(s1, InvariantStatus::Pass, "first bundle should pass");
assert!(
matches!(s2, InvariantStatus::Advisory { .. }),
"similar second bundle should be advisory, got: {s2:?}"
);
}
#[test]
fn principal_isolation() {
let mut det = FragmentationBypassDetector::new(4, 20, 200_000, 0.1);
let seq = "ATGCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGA";
let b_alice = dna_bundle(&seq[..30]);
let b_bob = dna_bundle(&seq[5..35]);
let s_alice = det.evaluate_bundle(&b_alice, "alice", &[]);
assert_eq!(s_alice, InvariantStatus::Pass);
let s_bob = det.evaluate_bundle(&b_bob, "bob", &[]);
assert_eq!(
s_bob,
InvariantStatus::Pass,
"bob's window is independent of alice's"
);
assert_eq!(det.principal_count(), 2);
}
#[test]
fn eviction_at_window_cap() {
let max_window = 5;
let mut det = FragmentationBypassDetector::new(4, max_window, 200_000, 0.99);
for i in 0..10usize {
let bases = ['A', 'T', 'G', 'C'];
let seq: String = (0..30).map(|j| bases[(i + j) % 4]).collect();
let bundle = dna_bundle(&seq);
det.evaluate_bundle(&bundle, "alice", &[]);
}
assert!(
det.window_len("alice") <= max_window,
"window {} exceeds cap {}",
det.window_len("alice"),
max_window
);
}
#[test]
fn dissimilar_sequences_pass() {
let mut det = FragmentationBypassDetector::new(8, 20, 200_000, 0.01);
let b1 = dna_bundle("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA");
let b2 = dna_bundle("TTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT");
let s1 = det.evaluate_bundle(&b1, "alice", &[]);
let s2 = det.evaluate_bundle(&b2, "alice", &[]);
assert_eq!(s1, InvariantStatus::Pass);
assert_eq!(s2, InvariantStatus::Pass);
}
#[test]
fn fail_on_high_overlap_with_hazard_hits() {
let mut det = FragmentationBypassDetector::new(4, 20, 200_000, 0.1);
let seq = "ATGCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGATCGA";
let b1 = dna_bundle(&seq[..30]);
let b2 = dna_bundle(&seq[5..35]);
let _ = det.evaluate_bundle(&b1, "alice", &[]);
let status = det.evaluate_bundle(&b2, "alice", &[fake_hazard_hit()]);
assert!(
matches!(status, InvariantStatus::Fail { .. }),
"expected Fail, got {status:?}"
);
}
}