1use crate::diagnostics::DegradationTracker;
25use crate::traits::Language;
26use crate::types::{ApiSurface, BehavioralBreak, StructuralChange};
27use dashmap::DashMap;
28use std::collections::HashSet;
29use std::sync::Arc;
30use tokio::sync::broadcast;
31
32const BROADCAST_CAPACITY: usize = 4096;
37
38pub struct SharedFindings<L: Language> {
46 structural_breaks: DashMap<String, StructuralChange>,
49
50 behavioral_breaks: DashMap<String, BehavioralBreak<L>>,
53
54 td_broadcast_tx: broadcast::Sender<String>,
62
63 old_surface: tokio::sync::OnceCell<Arc<ApiSurface<L::SymbolData>>>,
67
68 new_surface: tokio::sync::OnceCell<Arc<ApiSurface<L::SymbolData>>>,
70
71 degradation: Arc<DegradationTracker>,
74}
75
76impl<L: Language> SharedFindings<L> {
77 pub fn new() -> Self {
79 let (tx, _) = broadcast::channel(BROADCAST_CAPACITY);
80 Self {
81 structural_breaks: DashMap::new(),
82 behavioral_breaks: DashMap::new(),
83 td_broadcast_tx: tx,
84 old_surface: tokio::sync::OnceCell::new(),
85 new_surface: tokio::sync::OnceCell::new(),
86 degradation: Arc::new(DegradationTracker::new()),
87 }
88 }
89
90 pub fn insert_structural_break(&self, change: StructuralChange) {
98 let name = change.qualified_name.clone();
99 self.structural_breaks.insert(name.clone(), change);
100 let _ = self.td_broadcast_tx.send(name);
102 }
103
104 pub fn insert_structural_breaks(&self, changes: Vec<StructuralChange>) {
106 for change in changes {
107 self.insert_structural_break(change);
108 }
109 }
110
111 pub fn set_old_surface(&self, surface: Arc<ApiSurface<L::SymbolData>>) {
114 let _ = self.old_surface.set(surface);
115 }
116
117 pub fn set_new_surface(&self, surface: Arc<ApiSurface<L::SymbolData>>) {
119 let _ = self.new_surface.set(surface);
120 }
121
122 pub fn subscribe_to_td(&self) -> BuReceiver {
130 BuReceiver {
131 rx: self.td_broadcast_tx.subscribe(),
132 skip_set: HashSet::new(),
133 }
134 }
135
136 pub fn has_structural_break(&self, qualified_name: &str) -> bool {
141 self.structural_breaks.contains_key(qualified_name)
142 }
143
144 pub fn insert_behavioral_break(&self, brk: BehavioralBreak<L>) {
146 self.behavioral_breaks.insert(brk.symbol.clone(), brk);
147 }
148
149 pub fn structural_breaks(&self) -> &DashMap<String, StructuralChange> {
153 &self.structural_breaks
154 }
155
156 pub fn behavioral_breaks(&self) -> &DashMap<String, BehavioralBreak<L>> {
158 &self.behavioral_breaks
159 }
160
161 pub async fn get_old_surface(&self) -> &Arc<ApiSurface<L::SymbolData>> {
163 self.old_surface
164 .get_or_init(|| async { panic!("TD must set old_surface before BU reads it") })
165 .await
166 }
167
168 pub async fn get_new_surface(&self) -> &Arc<ApiSurface<L::SymbolData>> {
170 self.new_surface
171 .get_or_init(|| async { panic!("TD must set new_surface before BU reads it") })
172 .await
173 }
174
175 pub fn try_get_old_surface(&self) -> Option<&Arc<ApiSurface<L::SymbolData>>> {
179 self.old_surface.get()
180 }
181
182 pub fn try_get_new_surface(&self) -> Option<&Arc<ApiSurface<L::SymbolData>>> {
184 self.new_surface.get()
185 }
186
187 pub fn degradation(&self) -> &DegradationTracker {
193 &self.degradation
194 }
195
196 pub fn degradation_arc(&self) -> Arc<DegradationTracker> {
200 self.degradation.clone()
201 }
202
203 pub fn structural_break_count(&self) -> usize {
207 self.structural_breaks.len()
208 }
209
210 pub fn behavioral_break_count(&self) -> usize {
212 self.behavioral_breaks.len()
213 }
214
215 pub fn structural_break_names(&self) -> Vec<String> {
217 self.structural_breaks
218 .iter()
219 .map(|entry| entry.key().clone())
220 .collect()
221 }
222}
223
224impl<L: Language> Default for SharedFindings<L> {
225 fn default() -> Self {
226 Self::new()
227 }
228}
229
230pub struct BuReceiver {
241 rx: broadcast::Receiver<String>,
242 skip_set: HashSet<String>,
243}
244
245impl BuReceiver {
246 pub fn drain_and_check(&mut self, qualified_name: &str) -> bool {
253 loop {
255 match self.rx.try_recv() {
256 Ok(name) => {
257 self.skip_set.insert(name);
258 }
259 Err(broadcast::error::TryRecvError::Empty) => break,
260 Err(broadcast::error::TryRecvError::Closed) => break,
261 Err(broadcast::error::TryRecvError::Lagged(n)) => {
262 tracing::warn!(
265 lagged_messages = n,
266 "BU broadcast receiver lagged; falling back to DashMap checks"
267 );
268 break;
269 }
270 }
271 }
272
273 self.skip_set.contains(qualified_name)
274 }
275
276 pub fn is_skipped(&self, qualified_name: &str) -> bool {
279 self.skip_set.contains(qualified_name)
280 }
281
282 pub fn skip_set_size(&self) -> usize {
284 self.skip_set.len()
285 }
286}
287
288pub fn should_skip_for_bu<L: Language>(
293 shared: &SharedFindings<L>,
294 receiver: &mut BuReceiver,
295 qualified_name: &str,
296) -> bool {
297 receiver.drain_and_check(qualified_name) || shared.has_structural_break(qualified_name)
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303 use crate::test_support::TestLang;
304 use crate::types::{ChangeSubject, StructuralChangeType, SymbolKind};
305 use std::sync::Arc;
306
307 fn make_structural_change(name: &str) -> StructuralChange {
308 StructuralChange {
309 symbol: name.to_string(),
310 qualified_name: name.to_string(),
311 kind: SymbolKind::Function,
312 package: None,
313 change_type: StructuralChangeType::Removed(ChangeSubject::Symbol {
314 kind: SymbolKind::Function,
315 }),
316 before: None,
317 after: None,
318 description: format!("{} was removed", name),
319 is_breaking: true,
320 impact: None,
321 migration_target: None,
322 }
323 }
324
325 fn make_behavioral_break(name: &str) -> BehavioralBreak<TestLang> {
326 BehavioralBreak {
327 symbol: name.to_string(),
328 caused_by: name.to_string(),
329 call_path: vec![name.to_string()],
330 evidence_description: "TestDelta: test assertions changed".to_string(),
331 confidence: 0.95,
332 description: format!("{} behavior changed", name),
333 category: None,
334 evidence_type: crate::types::EvidenceType::TestDelta,
335 is_internal_only: None,
336 }
337 }
338
339 #[test]
340 fn shared_findings_basic_operations() {
341 let shared: SharedFindings<TestLang> = SharedFindings::new();
342
343 assert_eq!(shared.structural_break_count(), 0);
345 assert_eq!(shared.behavioral_break_count(), 0);
346
347 shared.insert_structural_break(make_structural_change("foo"));
349 assert_eq!(shared.structural_break_count(), 1);
350 assert!(shared.has_structural_break("foo"));
351 assert!(!shared.has_structural_break("bar"));
352
353 shared.insert_behavioral_break(make_behavioral_break("bar"));
355 assert_eq!(shared.behavioral_break_count(), 1);
356 }
357
358 #[test]
359 fn shared_findings_batch_insert() {
360 let shared: SharedFindings<TestLang> = SharedFindings::new();
361
362 let changes = vec![
363 make_structural_change("a"),
364 make_structural_change("b"),
365 make_structural_change("c"),
366 ];
367 shared.insert_structural_breaks(changes);
368
369 assert_eq!(shared.structural_break_count(), 3);
370 assert!(shared.has_structural_break("a"));
371 assert!(shared.has_structural_break("b"));
372 assert!(shared.has_structural_break("c"));
373 }
374
375 #[test]
376 fn broadcast_receiver_skip_set() {
377 let shared: SharedFindings<TestLang> = SharedFindings::new();
378 let mut receiver = shared.subscribe_to_td();
379
380 shared.insert_structural_break(make_structural_change("foo"));
382
383 assert!(receiver.drain_and_check("foo"));
385 assert!(!receiver.drain_and_check("bar"));
386
387 assert!(receiver.is_skipped("foo"));
389 assert!(!receiver.is_skipped("bar"));
390 }
391
392 #[test]
393 fn broadcast_multiple_messages() {
394 let shared: SharedFindings<TestLang> = SharedFindings::new();
395 let mut receiver = shared.subscribe_to_td();
396
397 shared.insert_structural_break(make_structural_change("alpha"));
399 shared.insert_structural_break(make_structural_change("beta"));
400 shared.insert_structural_break(make_structural_change("gamma"));
401
402 assert!(receiver.drain_and_check("alpha"));
404 assert!(receiver.is_skipped("beta"));
405 assert!(receiver.is_skipped("gamma"));
406 assert_eq!(receiver.skip_set_size(), 3);
407 }
408
409 #[test]
410 fn should_skip_combines_broadcast_and_dashmap() {
411 let shared: SharedFindings<TestLang> = SharedFindings::new();
412
413 shared.insert_structural_break(make_structural_change("early"));
415
416 let mut receiver = shared.subscribe_to_td();
417
418 shared.insert_structural_break(make_structural_change("late"));
420
421 assert!(should_skip_for_bu(&shared, &mut receiver, "early"));
423 assert!(should_skip_for_bu(&shared, &mut receiver, "late"));
424 assert!(!should_skip_for_bu(&shared, &mut receiver, "unknown"));
425 }
426
427 #[test]
428 fn structural_break_names() {
429 let shared: SharedFindings<TestLang> = SharedFindings::new();
430 shared.insert_structural_break(make_structural_change("x"));
431 shared.insert_structural_break(make_structural_change("y"));
432
433 let mut names = shared.structural_break_names();
434 names.sort();
435 assert_eq!(names, vec!["x", "y"]);
436 }
437
438 #[test]
439 fn surface_try_get_before_set() {
440 let shared: SharedFindings<TestLang> = SharedFindings::new();
441 assert!(shared.try_get_old_surface().is_none());
442 assert!(shared.try_get_new_surface().is_none());
443 }
444
445 #[test]
446 fn surface_set_and_get() {
447 let shared: SharedFindings<TestLang> = SharedFindings::new();
448
449 let surface = Arc::new(ApiSurface { symbols: vec![] });
450 shared.set_old_surface(surface);
451 assert!(shared.try_get_old_surface().is_some());
452 assert_eq!(shared.try_get_old_surface().unwrap().symbols.len(), 0);
453 }
454
455 #[tokio::test]
456 async fn surface_async_get() {
457 let shared: Arc<SharedFindings<TestLang>> = Arc::new(SharedFindings::new());
458
459 let surface = Arc::new(ApiSurface { symbols: vec![] });
460 shared.set_new_surface(surface);
461
462 let result = shared.get_new_surface().await;
463 assert_eq!(result.symbols.len(), 0);
464 }
465
466 #[test]
467 fn concurrent_inserts() {
468 use std::thread;
469
470 let shared: Arc<SharedFindings<TestLang>> = Arc::new(SharedFindings::new());
471 let mut handles = Vec::new();
472
473 for t in 0..10 {
475 let shared = shared.clone();
476 handles.push(thread::spawn(move || {
477 for i in 0..100 {
478 let name = format!("fn_{}_{}", t, i);
479 shared.insert_structural_break(make_structural_change(&name));
480 }
481 }));
482 }
483
484 for handle in handles {
485 handle.join().unwrap();
486 }
487
488 assert_eq!(shared.structural_break_count(), 1000);
489 }
490}