1use crate::grammar::{GrammarState, ReasonCode};
50
51pub const MAX_SWARM_NODES: usize = 64;
55
56pub const KS_REJECT_THRESHOLD: f32 = 3.5;
59
60pub const QUORUM_MIN_FRACTION: f32 = 0.67;
62
63pub const CONSENSUS_THRESHOLD: f32 = 0.50;
65
66#[derive(Debug, Clone, Copy)]
70pub struct GrammarVote {
71 pub node_id: u8,
73 pub state: GrammarState,
75 pub dsa_score: f32,
77 pub episode_count: u32,
79 pub hardware_authenticated: bool,
81}
82
83#[derive(Debug, Clone, Copy)]
87pub struct SwarmConsensus {
88 pub p_admissible: f32,
90 pub p_boundary: f32,
92 pub p_violation: f32,
94 pub modal_state: GrammarState,
96 pub quorum_reached: bool,
98 pub votes_admitted: u8,
100 pub votes_quarantined: u8,
102 pub votes_unauthenticated: u8,
104 pub consensus_dsa_score: f32,
106 pub auth_required: bool,
108}
109
110impl SwarmConsensus {
111 pub const fn no_quorum() -> Self {
114 Self {
115 p_admissible: 1.0,
116 p_boundary: 0.0,
117 p_violation: 0.0,
118 modal_state: GrammarState::Admissible,
119 quorum_reached: false,
120 votes_admitted: 0,
121 votes_quarantined: 0,
122 votes_unauthenticated: 0,
123 consensus_dsa_score: 0.0,
124 auth_required: false,
125 }
126 }
127}
128
129pub fn compute_consensus(
142 votes: &[GrammarVote],
143 bft_f: u8,
144 require_auth: bool,
145) -> SwarmConsensus {
146 if votes.is_empty() {
147 return SwarmConsensus::no_quorum();
148 }
149
150 let (admitted_buf, admitted_count, n_unauth) = authenticate_votes(votes, require_auth);
151 if admitted_count == 0 {
152 return SwarmConsensus {
153 quorum_reached: false,
154 votes_unauthenticated: n_unauth,
155 auth_required: require_auth,
156 ..SwarmConsensus::no_quorum()
157 };
158 }
159 let admitted = &admitted_buf[..admitted_count];
160
161 let (final_buf, final_count, n_quarantined) = quarantine_outliers(admitted);
162 let final_votes = &final_buf[..final_count];
163
164 let n_total = votes.len().min(MAX_SWARM_NODES);
165 let quorum_needed = (2 * bft_f as usize + 1).max(1);
166 let quorum_fraction = final_count as f32 / n_total.max(1) as f32;
167 let quorum_reached = final_count >= quorum_needed
168 && quorum_fraction >= QUORUM_MIN_FRACTION;
169
170 if !quorum_reached || final_votes.is_empty() {
171 return SwarmConsensus {
172 quorum_reached: false,
173 votes_admitted: admitted_count as u8,
174 votes_quarantined: n_quarantined,
175 votes_unauthenticated: n_unauth,
176 auth_required: require_auth,
177 ..SwarmConsensus::no_quorum()
178 };
179 }
180
181 tally_consensus(final_votes, require_auth, n_quarantined, n_unauth, quorum_reached)
182}
183
184fn authenticate_votes(
185 votes: &[GrammarVote],
186 require_auth: bool,
187) -> ([GrammarVote; MAX_SWARM_NODES], usize, u8) {
188 let mut admitted_buf = [GrammarVote {
189 node_id: 0, state: GrammarState::Admissible,
190 dsa_score: 0.0, episode_count: 0, hardware_authenticated: false,
191 }; MAX_SWARM_NODES];
192 let mut admitted_count = 0usize;
193 let mut n_unauth = 0u8;
194
195 for vote in votes.iter().take(MAX_SWARM_NODES) {
196 if require_auth && !vote.hardware_authenticated {
197 n_unauth = n_unauth.saturating_add(1);
198 continue;
199 }
200 if admitted_count < MAX_SWARM_NODES {
201 admitted_buf[admitted_count] = *vote;
202 admitted_count += 1;
203 }
204 }
205 (admitted_buf, admitted_count, n_unauth)
206}
207
208fn insertion_sort_median(values: &mut [f32]) -> f32 {
209 let n = values.len();
210 if n == 0 { return 0.0; }
211 for i in 1..n {
212 let key = values[i];
213 let mut j = i;
214 while j > 0 && values[j - 1] > key {
215 values[j] = values[j - 1];
216 j -= 1;
217 }
218 values[j] = key;
219 }
220 if n % 2 == 1 { values[n / 2] } else { (values[n / 2 - 1] + values[n / 2]) * 0.5 }
221}
222
223fn quarantine_outliers(
224 admitted: &[GrammarVote],
225) -> ([GrammarVote; MAX_SWARM_NODES], usize, u8) {
226 const MAD_SCALE: f32 = 1.482_602_2;
227 let admitted_count = admitted.len();
228
229 let mut sorted_scores = [0.0f32; MAX_SWARM_NODES];
230 for (i, v) in admitted.iter().enumerate() {
231 sorted_scores[i] = v.dsa_score;
232 }
233 let median_dsa = insertion_sort_median(&mut sorted_scores[..admitted_count]);
234
235 let mut abs_devs = [0.0f32; MAX_SWARM_NODES];
236 for (i, v) in admitted.iter().enumerate() {
237 abs_devs[i] = (v.dsa_score - median_dsa).abs();
238 }
239 let mad = insertion_sort_median(&mut abs_devs[..admitted_count]);
240 let robust_sigma = (MAD_SCALE * mad).max(1e-9);
241
242 let mut final_buf = [GrammarVote {
243 node_id: 0, state: GrammarState::Admissible,
244 dsa_score: 0.0, episode_count: 0, hardware_authenticated: false,
245 }; MAX_SWARM_NODES];
246 let mut final_count = 0usize;
247 let mut n_quarantined = 0u8;
248
249 for vote in admitted {
250 let z = (vote.dsa_score - median_dsa).abs() / robust_sigma;
251 if z > KS_REJECT_THRESHOLD {
252 n_quarantined = n_quarantined.saturating_add(1);
253 } else if final_count < MAX_SWARM_NODES {
254 final_buf[final_count] = *vote;
255 final_count += 1;
256 }
257 }
258 (final_buf, final_count, n_quarantined)
259}
260
261fn tally_consensus(
262 final_votes: &[GrammarVote],
263 require_auth: bool,
264 n_quarantined: u8,
265 n_unauth: u8,
266 quorum_reached: bool,
267) -> SwarmConsensus {
268 let total_weight: f32 = final_votes.iter()
269 .map(|v| v.episode_count as f32)
270 .sum::<f32>()
271 .max(1.0);
272
273 let w_admissible: f32 = final_votes.iter()
274 .filter(|v| v.state == GrammarState::Admissible)
275 .map(|v| v.episode_count as f32).sum();
276 let w_boundary: f32 = final_votes.iter()
277 .filter(|v| v.state.is_boundary())
278 .map(|v| v.episode_count as f32).sum();
279 let w_violation: f32 = final_votes.iter()
280 .filter(|v| v.state == GrammarState::Violation)
281 .map(|v| v.episode_count as f32).sum();
282
283 let p_admissible = w_admissible / total_weight;
284 let p_boundary = w_boundary / total_weight;
285 let p_violation = w_violation / total_weight;
286
287 let modal_state = if p_admissible >= p_boundary && p_admissible >= p_violation {
288 GrammarState::Admissible
289 } else if p_boundary >= p_violation {
290 GrammarState::Boundary(ReasonCode::SustainedOutwardDrift)
291 } else {
292 GrammarState::Violation
293 };
294
295 let consensus_dsa_score = final_votes.iter()
296 .map(|v| v.dsa_score * v.episode_count as f32)
297 .sum::<f32>() / total_weight;
298
299 SwarmConsensus {
300 p_admissible, p_boundary, p_violation, modal_state, quorum_reached,
301 votes_admitted: final_votes.len() as u8,
302 votes_quarantined: n_quarantined,
303 votes_unauthenticated: n_unauth,
304 consensus_dsa_score,
305 auth_required: require_auth,
306 }
307}
308
309pub fn consensus_grammar_state(c: &SwarmConsensus) -> Option<GrammarState> {
314 if !c.quorum_reached { return None; }
315 let p_modal = match c.modal_state {
316 GrammarState::Admissible => c.p_admissible,
317 GrammarState::Boundary(_) => c.p_boundary,
318 GrammarState::Violation => c.p_violation,
319 };
320 if p_modal >= CONSENSUS_THRESHOLD { Some(c.modal_state) } else { None }
321}
322
323#[derive(Debug, Clone, Copy, PartialEq, Eq)]
348#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
349pub enum GovernanceTag {
350 Nominal,
352 ObserverQuarantined,
358 MissedAlarm,
363 LocalHardwareAnomaly,
374 LoInstabilityPrecursor,
384}
385
386impl GovernanceTag {
387 pub const fn label(self) -> &'static str {
389 match self {
390 GovernanceTag::Nominal =>
391 "[Governance]: Nominal",
392 GovernanceTag::ObserverQuarantined =>
393 "[Governance]: Observer_Quarantined | Reason: DSA_Outlier",
394 GovernanceTag::MissedAlarm =>
395 "[Governance]: Missed_Alarm | Consensus: Boundary_Or_Violation",
396 GovernanceTag::LocalHardwareAnomaly =>
397 "[Governance]: Local_Hardware_Anomaly_Detected | Consensus: Admissible",
398 GovernanceTag::LoInstabilityPrecursor =>
399 "[Governance]: LO_Instability_Precursor | Review: Advisory",
400 }
401 }
402
403 #[inline]
405 pub const fn requires_action(self) -> bool {
406 !matches!(self, GovernanceTag::Nominal)
407 }
408}
409
410#[derive(Debug, Clone, Copy)]
415#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
416pub struct NodeGovernanceReport {
417 pub node_id: u8,
419 pub tag: GovernanceTag,
421 pub robust_z: f32,
424 pub admitted: bool,
426 pub local_grammar_severity: u8,
428}
429
430impl NodeGovernanceReport {
431 #[inline]
433 pub fn requires_action(&self) -> bool {
434 self.tag.requires_action()
435 }
436}
437
438pub fn swarm_governance_report(
484 votes: &[GrammarVote],
485 bft_f: u8,
486 require_auth: bool,
487 lo_precursor_nodes: u64,
488) -> ([NodeGovernanceReport; MAX_SWARM_NODES], usize, SwarmConsensus) {
489 let mut reports = [blank_report(); MAX_SWARM_NODES];
490 let n_votes = votes.len().min(MAX_SWARM_NODES);
491 if n_votes == 0 {
492 return (reports, 0, SwarmConsensus::no_quorum());
493 }
494
495 let admitted_flags = collect_admitted_flags(votes, require_auth);
496 let (median_dsa, robust_sigma) = compute_median_and_mad(votes, &admitted_flags);
497 let consensus = compute_consensus(votes, bft_f, require_auth);
498 let cons_sev = consensus.modal_state.severity();
499
500 for (i, vote) in votes.iter().take(MAX_SWARM_NODES).enumerate() {
501 reports[i] = build_node_report(
502 vote,
503 admitted_flags[i],
504 median_dsa,
505 robust_sigma,
506 lo_precursor_nodes,
507 cons_sev,
508 );
509 }
510
511 (reports, n_votes, consensus)
512}
513
514#[inline]
515fn blank_report() -> NodeGovernanceReport {
516 NodeGovernanceReport {
517 node_id: 0,
518 tag: GovernanceTag::Nominal,
519 robust_z: 0.0,
520 admitted: false,
521 local_grammar_severity: 0,
522 }
523}
524
525fn collect_admitted_flags(votes: &[GrammarVote], require_auth: bool) -> [bool; MAX_SWARM_NODES] {
526 let mut admitted_flags = [false; MAX_SWARM_NODES];
527 for (i, vote) in votes.iter().take(MAX_SWARM_NODES).enumerate() {
528 admitted_flags[i] = !require_auth || vote.hardware_authenticated;
529 }
530 admitted_flags
531}
532
533fn compute_median_and_mad(
534 votes: &[GrammarVote],
535 admitted_flags: &[bool; MAX_SWARM_NODES],
536) -> (f32, f32) {
537 const MAD_SCALE: f32 = 1.482_602_2;
538
539 let mut sorted_buf = [0.0f32; MAX_SWARM_NODES];
540 let mut n_admitted = 0usize;
541 for (i, vote) in votes.iter().take(MAX_SWARM_NODES).enumerate() {
542 if admitted_flags[i] {
543 sorted_buf[n_admitted] = vote.dsa_score;
544 n_admitted += 1;
545 }
546 }
547 insertion_sort(&mut sorted_buf[..n_admitted]);
548 let median_dsa = median_of_sorted(&sorted_buf[..n_admitted]);
549
550 let mut abs_devs = [0.0f32; MAX_SWARM_NODES];
551 let mut m = 0usize;
552 for (i, vote) in votes.iter().take(MAX_SWARM_NODES).enumerate() {
553 if admitted_flags[i] {
554 abs_devs[m] = (vote.dsa_score - median_dsa).abs();
555 m += 1;
556 }
557 }
558 insertion_sort(&mut abs_devs[..m]);
559 let mad = median_of_sorted(&abs_devs[..m]);
560 let robust_sigma = (MAD_SCALE * mad).max(1e-9);
561
562 (median_dsa, robust_sigma)
563}
564
565#[inline]
566fn insertion_sort(buf: &mut [f32]) {
567 for i in 1..buf.len() {
568 let key = buf[i];
569 let mut j = i;
570 while j > 0 && buf[j - 1] > key {
571 buf[j] = buf[j - 1];
572 j -= 1;
573 }
574 buf[j] = key;
575 }
576}
577
578#[inline]
579fn median_of_sorted(buf: &[f32]) -> f32 {
580 let n = buf.len();
581 if n == 0 {
582 0.0
583 } else if n % 2 == 1 {
584 buf[n / 2]
585 } else {
586 (buf[n / 2 - 1] + buf[n / 2]) * 0.5
587 }
588}
589
590fn build_node_report(
591 vote: &GrammarVote,
592 admitted: bool,
593 median_dsa: f32,
594 robust_sigma: f32,
595 lo_precursor_nodes: u64,
596 cons_sev: u8,
597) -> NodeGovernanceReport {
598 let z = if admitted {
599 (vote.dsa_score - median_dsa).abs() / robust_sigma
600 } else {
601 0.0
602 };
603 let quarantined = admitted && z > KS_REJECT_THRESHOLD;
604 let is_lo = lo_precursor_nodes & (1u64 << (vote.node_id.min(63) as u64)) != 0;
605 let local_sev = vote.state.severity();
606 let tag = assign_governance_tag(admitted, quarantined, is_lo, local_sev, cons_sev);
607
608 NodeGovernanceReport {
609 node_id: vote.node_id,
610 tag,
611 robust_z: z,
612 admitted: admitted && !quarantined,
613 local_grammar_severity: local_sev,
614 }
615}
616
617#[inline]
618fn assign_governance_tag(
619 admitted: bool,
620 quarantined: bool,
621 is_lo: bool,
622 local_sev: u8,
623 cons_sev: u8,
624) -> GovernanceTag {
625 if !admitted {
626 GovernanceTag::ObserverQuarantined
627 } else if quarantined {
628 if local_sev >= 1 && cons_sev == 0 {
629 GovernanceTag::LocalHardwareAnomaly
630 } else {
631 GovernanceTag::ObserverQuarantined
632 }
633 } else if is_lo && local_sev >= 1 {
634 GovernanceTag::LoInstabilityPrecursor
635 } else if local_sev == 0 && cons_sev >= 1 {
636 GovernanceTag::MissedAlarm
637 } else {
638 GovernanceTag::Nominal
639 }
640}
641
642
643#[cfg(test)]
646mod tests {
647 use super::*;
648
649 fn vote(id: u8, state: GrammarState, dsa: f32, epi: u32, auth: bool) -> GrammarVote {
650 GrammarVote { node_id: id, state, dsa_score: dsa, episode_count: epi,
651 hardware_authenticated: auth }
652 }
653
654 #[test]
655 fn unanimous_admissible_consensus() {
656 let votes: [GrammarVote; 5] = [
657 vote(0, GrammarState::Admissible, 1.0, 10, true),
658 vote(1, GrammarState::Admissible, 0.9, 12, true),
659 vote(2, GrammarState::Admissible, 1.1, 8, true),
660 vote(3, GrammarState::Admissible, 1.0, 11, true),
661 vote(4, GrammarState::Admissible, 0.95, 9, true),
662 ];
663 let c = compute_consensus(&votes, 1, false);
664 assert!(c.quorum_reached, "quorum must be reached");
665 assert_eq!(c.modal_state, GrammarState::Admissible);
666 assert!(c.p_admissible > 0.95, "nearly all admissible: {}", c.p_admissible);
667 }
668
669 #[test]
670 fn no_quorum_on_empty_votes() {
671 let c = compute_consensus(&[], 1, false);
672 assert!(!c.quorum_reached);
673 assert_eq!(c.modal_state, GrammarState::Admissible,
674 "safe default when no quorum");
675 }
676
677 #[test]
678 fn byzantine_outlier_quarantined() {
679 let votes = [
680 vote(0, GrammarState::Admissible, 1.0, 10, true),
681 vote(1, GrammarState::Admissible, 1.1, 10, true),
682 vote(2, GrammarState::Admissible, 0.9, 10, true),
683 vote(3, GrammarState::Admissible, 1.05, 10, true),
684 vote(4, GrammarState::Violation, 1000.0, 10, true),
686 ];
687 let c = compute_consensus(&votes, 1, false);
688 assert!(c.votes_quarantined >= 1, "Byzantine vote must be quarantined: {:?}", c);
689 assert_eq!(c.modal_state, GrammarState::Admissible,
690 "consensus must remain Admissible after quarantine");
691 }
692
693 #[test]
694 fn majority_violation_consensus() {
695 let votes = [
696 vote(0, GrammarState::Violation, 4.5, 20, true),
697 vote(1, GrammarState::Violation, 4.8, 18, true),
698 vote(2, GrammarState::Violation, 4.3, 22, true),
699 vote(3, GrammarState::Boundary(ReasonCode::SustainedOutwardDrift), 2.5, 15, true),
700 vote(4, GrammarState::Admissible, 1.0, 10, true),
701 ];
702 let c = compute_consensus(&votes, 1, false);
703 assert!(c.quorum_reached);
704 assert_eq!(c.modal_state, GrammarState::Violation,
705 "Violation majority: p_v={:.2}", c.p_violation);
706 }
707
708 #[test]
709 fn auth_filter_excludes_unauthenticated() {
710 let votes = [
711 vote(0, GrammarState::Violation, 5.0, 20, false), vote(1, GrammarState::Admissible, 1.0, 10, true),
713 vote(2, GrammarState::Admissible, 0.9, 12, true),
714 vote(3, GrammarState::Admissible, 1.1, 11, true),
715 ];
716 let c = compute_consensus(&votes, 1, true);
717 assert_eq!(c.votes_unauthenticated, 1, "one unauth vote");
718 assert_eq!(c.modal_state, GrammarState::Admissible,
719 "unauthenticated Violation vote must be excluded");
720 }
721
722 #[test]
723 fn consensus_grammar_state_requires_threshold() {
724 let mut c = SwarmConsensus::no_quorum();
725 c.quorum_reached = true;
726 c.modal_state = GrammarState::Boundary(ReasonCode::SustainedOutwardDrift);
727 c.p_boundary = 0.4; assert!(consensus_grammar_state(&c).is_none(),
729 "below threshold: must return None");
730 c.p_boundary = 0.6;
731 let result = consensus_grammar_state(&c);
732 assert!(result.map(|s| s.is_boundary()).unwrap_or(false), "boundary consensus");
733 }
734
735 #[test]
736 fn too_few_nodes_no_quorum() {
737 let votes = [
739 vote(0, GrammarState::Admissible, 1.0, 10, true),
740 vote(1, GrammarState::Admissible, 0.9, 10, true),
741 vote(2, GrammarState::Admissible, 1.1, 10, true),
742 ];
743 let c = compute_consensus(&votes, 2, false);
744 assert!(!c.quorum_reached, "3 votes insufficient for bft_f=2");
745 }
746}