use crate::diagnostics::DegradationTracker;
use crate::traits::Language;
use crate::types::{ApiSurface, BehavioralBreak, StructuralChange};
use dashmap::DashMap;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::broadcast;
const BROADCAST_CAPACITY: usize = 4096;
pub struct SharedFindings<L: Language> {
structural_breaks: DashMap<String, StructuralChange>,
behavioral_breaks: DashMap<String, BehavioralBreak<L>>,
td_broadcast_tx: broadcast::Sender<String>,
old_surface: tokio::sync::OnceCell<Arc<ApiSurface<L::SymbolData>>>,
new_surface: tokio::sync::OnceCell<Arc<ApiSurface<L::SymbolData>>>,
degradation: Arc<DegradationTracker>,
}
impl<L: Language> SharedFindings<L> {
pub fn new() -> Self {
let (tx, _) = broadcast::channel(BROADCAST_CAPACITY);
Self {
structural_breaks: DashMap::new(),
behavioral_breaks: DashMap::new(),
td_broadcast_tx: tx,
old_surface: tokio::sync::OnceCell::new(),
new_surface: tokio::sync::OnceCell::new(),
degradation: Arc::new(DegradationTracker::new()),
}
}
pub fn insert_structural_break(&self, change: StructuralChange) {
let name = change.qualified_name.clone();
self.structural_breaks.insert(name.clone(), change);
let _ = self.td_broadcast_tx.send(name);
}
pub fn insert_structural_breaks(&self, changes: Vec<StructuralChange>) {
for change in changes {
self.insert_structural_break(change);
}
}
pub fn set_old_surface(&self, surface: Arc<ApiSurface<L::SymbolData>>) {
let _ = self.old_surface.set(surface);
}
pub fn set_new_surface(&self, surface: Arc<ApiSurface<L::SymbolData>>) {
let _ = self.new_surface.set(surface);
}
pub fn subscribe_to_td(&self) -> BuReceiver {
BuReceiver {
rx: self.td_broadcast_tx.subscribe(),
skip_set: HashSet::new(),
}
}
pub fn has_structural_break(&self, qualified_name: &str) -> bool {
self.structural_breaks.contains_key(qualified_name)
}
pub fn insert_behavioral_break(&self, brk: BehavioralBreak<L>) {
self.behavioral_breaks.insert(brk.symbol.clone(), brk);
}
pub fn structural_breaks(&self) -> &DashMap<String, StructuralChange> {
&self.structural_breaks
}
pub fn behavioral_breaks(&self) -> &DashMap<String, BehavioralBreak<L>> {
&self.behavioral_breaks
}
pub async fn get_old_surface(&self) -> &Arc<ApiSurface<L::SymbolData>> {
self.old_surface
.get_or_init(|| async { panic!("TD must set old_surface before BU reads it") })
.await
}
pub async fn get_new_surface(&self) -> &Arc<ApiSurface<L::SymbolData>> {
self.new_surface
.get_or_init(|| async { panic!("TD must set new_surface before BU reads it") })
.await
}
pub fn try_get_old_surface(&self) -> Option<&Arc<ApiSurface<L::SymbolData>>> {
self.old_surface.get()
}
pub fn try_get_new_surface(&self) -> Option<&Arc<ApiSurface<L::SymbolData>>> {
self.new_surface.get()
}
pub fn degradation(&self) -> &DegradationTracker {
&self.degradation
}
pub fn degradation_arc(&self) -> Arc<DegradationTracker> {
self.degradation.clone()
}
pub fn structural_break_count(&self) -> usize {
self.structural_breaks.len()
}
pub fn behavioral_break_count(&self) -> usize {
self.behavioral_breaks.len()
}
pub fn structural_break_names(&self) -> Vec<String> {
self.structural_breaks
.iter()
.map(|entry| entry.key().clone())
.collect()
}
}
impl<L: Language> Default for SharedFindings<L> {
fn default() -> Self {
Self::new()
}
}
pub struct BuReceiver {
rx: broadcast::Receiver<String>,
skip_set: HashSet<String>,
}
impl BuReceiver {
pub fn drain_and_check(&mut self, qualified_name: &str) -> bool {
loop {
match self.rx.try_recv() {
Ok(name) => {
self.skip_set.insert(name);
}
Err(broadcast::error::TryRecvError::Empty) => break,
Err(broadcast::error::TryRecvError::Closed) => break,
Err(broadcast::error::TryRecvError::Lagged(n)) => {
tracing::warn!(
lagged_messages = n,
"BU broadcast receiver lagged; falling back to DashMap checks"
);
break;
}
}
}
self.skip_set.contains(qualified_name)
}
pub fn is_skipped(&self, qualified_name: &str) -> bool {
self.skip_set.contains(qualified_name)
}
pub fn skip_set_size(&self) -> usize {
self.skip_set.len()
}
}
pub fn should_skip_for_bu<L: Language>(
shared: &SharedFindings<L>,
receiver: &mut BuReceiver,
qualified_name: &str,
) -> bool {
receiver.drain_and_check(qualified_name) || shared.has_structural_break(qualified_name)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_support::TestLang;
use crate::types::{ChangeSubject, StructuralChangeType, SymbolKind};
use std::sync::Arc;
fn make_structural_change(name: &str) -> StructuralChange {
StructuralChange {
symbol: name.to_string(),
qualified_name: name.to_string(),
kind: SymbolKind::Function,
package: None,
change_type: StructuralChangeType::Removed(ChangeSubject::Symbol {
kind: SymbolKind::Function,
}),
before: None,
after: None,
description: format!("{} was removed", name),
is_breaking: true,
impact: None,
migration_target: None,
}
}
fn make_behavioral_break(name: &str) -> BehavioralBreak<TestLang> {
BehavioralBreak {
symbol: name.to_string(),
caused_by: name.to_string(),
call_path: vec![name.to_string()],
evidence_description: "TestDelta: test assertions changed".to_string(),
confidence: 0.95,
description: format!("{} behavior changed", name),
category: None,
evidence_type: crate::types::EvidenceType::TestDelta,
is_internal_only: None,
}
}
#[test]
fn shared_findings_basic_operations() {
let shared: SharedFindings<TestLang> = SharedFindings::new();
assert_eq!(shared.structural_break_count(), 0);
assert_eq!(shared.behavioral_break_count(), 0);
shared.insert_structural_break(make_structural_change("foo"));
assert_eq!(shared.structural_break_count(), 1);
assert!(shared.has_structural_break("foo"));
assert!(!shared.has_structural_break("bar"));
shared.insert_behavioral_break(make_behavioral_break("bar"));
assert_eq!(shared.behavioral_break_count(), 1);
}
#[test]
fn shared_findings_batch_insert() {
let shared: SharedFindings<TestLang> = SharedFindings::new();
let changes = vec![
make_structural_change("a"),
make_structural_change("b"),
make_structural_change("c"),
];
shared.insert_structural_breaks(changes);
assert_eq!(shared.structural_break_count(), 3);
assert!(shared.has_structural_break("a"));
assert!(shared.has_structural_break("b"));
assert!(shared.has_structural_break("c"));
}
#[test]
fn broadcast_receiver_skip_set() {
let shared: SharedFindings<TestLang> = SharedFindings::new();
let mut receiver = shared.subscribe_to_td();
shared.insert_structural_break(make_structural_change("foo"));
assert!(receiver.drain_and_check("foo"));
assert!(!receiver.drain_and_check("bar"));
assert!(receiver.is_skipped("foo"));
assert!(!receiver.is_skipped("bar"));
}
#[test]
fn broadcast_multiple_messages() {
let shared: SharedFindings<TestLang> = SharedFindings::new();
let mut receiver = shared.subscribe_to_td();
shared.insert_structural_break(make_structural_change("alpha"));
shared.insert_structural_break(make_structural_change("beta"));
shared.insert_structural_break(make_structural_change("gamma"));
assert!(receiver.drain_and_check("alpha"));
assert!(receiver.is_skipped("beta"));
assert!(receiver.is_skipped("gamma"));
assert_eq!(receiver.skip_set_size(), 3);
}
#[test]
fn should_skip_combines_broadcast_and_dashmap() {
let shared: SharedFindings<TestLang> = SharedFindings::new();
shared.insert_structural_break(make_structural_change("early"));
let mut receiver = shared.subscribe_to_td();
shared.insert_structural_break(make_structural_change("late"));
assert!(should_skip_for_bu(&shared, &mut receiver, "early"));
assert!(should_skip_for_bu(&shared, &mut receiver, "late"));
assert!(!should_skip_for_bu(&shared, &mut receiver, "unknown"));
}
#[test]
fn structural_break_names() {
let shared: SharedFindings<TestLang> = SharedFindings::new();
shared.insert_structural_break(make_structural_change("x"));
shared.insert_structural_break(make_structural_change("y"));
let mut names = shared.structural_break_names();
names.sort();
assert_eq!(names, vec!["x", "y"]);
}
#[test]
fn surface_try_get_before_set() {
let shared: SharedFindings<TestLang> = SharedFindings::new();
assert!(shared.try_get_old_surface().is_none());
assert!(shared.try_get_new_surface().is_none());
}
#[test]
fn surface_set_and_get() {
let shared: SharedFindings<TestLang> = SharedFindings::new();
let surface = Arc::new(ApiSurface { symbols: vec![] });
shared.set_old_surface(surface);
assert!(shared.try_get_old_surface().is_some());
assert_eq!(shared.try_get_old_surface().unwrap().symbols.len(), 0);
}
#[tokio::test]
async fn surface_async_get() {
let shared: Arc<SharedFindings<TestLang>> = Arc::new(SharedFindings::new());
let surface = Arc::new(ApiSurface { symbols: vec![] });
shared.set_new_surface(surface);
let result = shared.get_new_surface().await;
assert_eq!(result.symbols.len(), 0);
}
#[test]
fn concurrent_inserts() {
use std::thread;
let shared: Arc<SharedFindings<TestLang>> = Arc::new(SharedFindings::new());
let mut handles = Vec::new();
for t in 0..10 {
let shared = shared.clone();
handles.push(thread::spawn(move || {
for i in 0..100 {
let name = format!("fn_{}_{}", t, i);
shared.insert_structural_break(make_structural_change(&name));
}
}));
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(shared.structural_break_count(), 1000);
}
}