Skip to main content

semver_analyzer_core/
shared.rs

1//! Concurrent shared state for TD/BU pipeline coordination.
2//!
3//! `SharedFindings` is the central coordination point between the TD (Top-Down)
4//! and BU (Bottom-Up) pipelines running concurrently. It provides:
5//!
6//! 1. **DashMap** for thread-safe structural and behavioral break storage
7//! 2. **Broadcast channel** for real-time TD→BU notifications (avoids
8//!    redundant LLM calls on symbols TD has already identified)
9//! 3. **OnceCell** for the API surfaces extracted by TD (BU reads these
10//!    for visibility resolution)
11//!
12//! ## Coordination Protocol
13//!
14//! ```text
15//! TD pipeline:                         BU pipeline:
16//! 1. Extract API surface at ref A      1. Parse git diff
17//! 2. Extract API surface at ref B      2. Extract changed functions
18//! 3. diff_surfaces()                   3. For each function:
19//! 4. For each structural break:           a. Drain broadcast channel
20//!    a. Insert into DashMap               b. Check DashMap + skip set
21//!    b. Broadcast qualified_name          c. If not found: analyze
22//! ```
23
24use crate::diagnostics::DegradationTracker;
25use crate::traits::Language;
26use crate::types::{ApiSurface, BehavioralBreak, StructuralChange};
27use dashmap::DashMap;
28use std::collections::HashSet;
29use std::sync::Arc;
30use tokio::sync::broadcast;
31
32/// Broadcast channel capacity. Sized for typical project API surfaces.
33/// If TD produces more findings than this before BU drains them,
34/// older messages are dropped — but BU also checks the DashMap directly,
35/// so no findings are lost.
36const BROADCAST_CAPACITY: usize = 4096;
37
38/// Concurrent shared state between TD and BU pipelines.
39///
40/// Generic over `L: Language` so that `BehavioralBreak<L>` carries
41/// typed category data instead of stringly-typed labels.
42///
43/// Thread-safe: all fields use concurrent data structures.
44/// Wrapped in `Arc` for sharing between async tasks.
45pub struct SharedFindings<L: Language> {
46    /// Structural breaks found by TD. Keyed by qualified_name.
47    /// TD inserts, BU checks before analyzing each function.
48    structural_breaks: DashMap<String, StructuralChange>,
49
50    /// Behavioral breaks found by BU. Keyed by qualified_name.
51    /// BU inserts after spec inference confirms a behavioral change.
52    behavioral_breaks: DashMap<String, BehavioralBreak<L>>,
53
54    /// Broadcast sender: TD sends qualified names here as it finds
55    /// structural breaks. BU subscribes and drains pending messages
56    /// into a local skip set before each function analysis.
57    ///
58    /// This avoids BU redundantly analyzing functions that TD is about
59    /// to (or just did) flag structurally. The broadcast is "best effort" —
60    /// BU also checks the DashMap directly as a fallback.
61    td_broadcast_tx: broadcast::Sender<String>,
62
63    /// API surface from the OLD ref (set by TD after extraction).
64    /// Stored as `Arc` so consumers can cheaply share the surface
65    /// across concurrent tasks without deep-cloning.
66    old_surface: tokio::sync::OnceCell<Arc<ApiSurface<L::SymbolData>>>,
67
68    /// API surface from the NEW ref (set by TD after extraction).
69    new_surface: tokio::sync::OnceCell<Arc<ApiSurface<L::SymbolData>>>,
70
71    /// Non-fatal issues that degraded analysis quality.
72    /// Accessible to all pipeline phases via `degradation()`.
73    degradation: Arc<DegradationTracker>,
74}
75
76impl<L: Language> SharedFindings<L> {
77    /// Create new shared state with an empty broadcast channel.
78    pub fn new() -> Self {
79        let (tx, _) = broadcast::channel(BROADCAST_CAPACITY);
80        Self {
81            structural_breaks: DashMap::new(),
82            behavioral_breaks: DashMap::new(),
83            td_broadcast_tx: tx,
84            old_surface: tokio::sync::OnceCell::new(),
85            new_surface: tokio::sync::OnceCell::new(),
86            degradation: Arc::new(DegradationTracker::new()),
87        }
88    }
89
90    // ── TD operations ───────────────────────────────────────────────
91
92    /// Insert a structural break found by TD.
93    ///
94    /// Also broadcasts the qualified name to BU via the channel.
95    /// If the broadcast fails (no receivers yet), that's fine — BU
96    /// will check the DashMap directly.
97    pub fn insert_structural_break(&self, change: StructuralChange) {
98        let name = change.qualified_name.clone();
99        self.structural_breaks.insert(name.clone(), change);
100        // Best-effort broadcast; ignore SendError (no receivers)
101        let _ = self.td_broadcast_tx.send(name);
102    }
103
104    /// Insert multiple structural breaks (batch operation after diff_surfaces).
105    pub fn insert_structural_breaks(&self, changes: Vec<StructuralChange>) {
106        for change in changes {
107            self.insert_structural_break(change);
108        }
109    }
110
111    /// Set the old API surface (called by TD after extraction).
112    /// Accepts an `Arc` so the caller can retain a cheap handle.
113    pub fn set_old_surface(&self, surface: Arc<ApiSurface<L::SymbolData>>) {
114        let _ = self.old_surface.set(surface);
115    }
116
117    /// Set the new API surface (called by TD after extraction).
118    pub fn set_new_surface(&self, surface: Arc<ApiSurface<L::SymbolData>>) {
119        let _ = self.new_surface.set(surface);
120    }
121
122    // ── BU operations ───────────────────────────────────────────────
123
124    /// Subscribe to TD's broadcast channel.
125    ///
126    /// Call this once at the start of BU. Returns a `BuReceiver` that
127    /// wraps the broadcast receiver and a local skip set for efficient
128    /// repeated checks.
129    pub fn subscribe_to_td(&self) -> BuReceiver {
130        BuReceiver {
131            rx: self.td_broadcast_tx.subscribe(),
132            skip_set: HashSet::new(),
133        }
134    }
135
136    /// Check if TD already found a structural break for this symbol.
137    ///
138    /// This is the DashMap fallback — always accurate but doesn't
139    /// benefit from the broadcast channel's real-time notifications.
140    pub fn has_structural_break(&self, qualified_name: &str) -> bool {
141        self.structural_breaks.contains_key(qualified_name)
142    }
143
144    /// Insert a behavioral break found by BU.
145    pub fn insert_behavioral_break(&self, brk: BehavioralBreak<L>) {
146        self.behavioral_breaks.insert(brk.symbol.clone(), brk);
147    }
148
149    // ── Read operations (post-analysis merge) ───────────────────────
150
151    /// Get all structural breaks (for merge step).
152    pub fn structural_breaks(&self) -> &DashMap<String, StructuralChange> {
153        &self.structural_breaks
154    }
155
156    /// Get all behavioral breaks (for merge step).
157    pub fn behavioral_breaks(&self) -> &DashMap<String, BehavioralBreak<L>> {
158        &self.behavioral_breaks
159    }
160
161    /// Get the old API surface (blocks if TD hasn't set it yet).
162    pub async fn get_old_surface(&self) -> &Arc<ApiSurface<L::SymbolData>> {
163        self.old_surface
164            .get_or_init(|| async { panic!("TD must set old_surface before BU reads it") })
165            .await
166    }
167
168    /// Get the new API surface (blocks if TD hasn't set it yet).
169    pub async fn get_new_surface(&self) -> &Arc<ApiSurface<L::SymbolData>> {
170        self.new_surface
171            .get_or_init(|| async { panic!("TD must set new_surface before BU reads it") })
172            .await
173    }
174
175    /// Try to get the old surface without blocking (returns None if not set yet).
176    /// Returns a reference to the `Arc` — clone the Arc (cheap) if you need
177    /// ownership, or borrow through it with `&**arc`.
178    pub fn try_get_old_surface(&self) -> Option<&Arc<ApiSurface<L::SymbolData>>> {
179        self.old_surface.get()
180    }
181
182    /// Try to get the new surface without blocking (returns None if not set yet).
183    pub fn try_get_new_surface(&self) -> Option<&Arc<ApiSurface<L::SymbolData>>> {
184        self.new_surface.get()
185    }
186
187    // ── Degradation tracking ────────────────────────────────────────
188
189    /// Access the degradation tracker to record or query non-fatal issues.
190    ///
191    /// Available to all pipeline phases and Language implementations.
192    pub fn degradation(&self) -> &DegradationTracker {
193        &self.degradation
194    }
195
196    /// Get a clone of the Arc-wrapped degradation tracker.
197    ///
198    /// Useful when you need to pass the tracker to a spawned task.
199    pub fn degradation_arc(&self) -> Arc<DegradationTracker> {
200        self.degradation.clone()
201    }
202
203    // ── Read operations (post-analysis merge) ───────────────────────
204
205    /// Count of structural breaks found so far.
206    pub fn structural_break_count(&self) -> usize {
207        self.structural_breaks.len()
208    }
209
210    /// Count of behavioral breaks found so far.
211    pub fn behavioral_break_count(&self) -> usize {
212        self.behavioral_breaks.len()
213    }
214
215    /// Get all structural break qualified names (for reconciliation).
216    pub fn structural_break_names(&self) -> Vec<String> {
217        self.structural_breaks
218            .iter()
219            .map(|entry| entry.key().clone())
220            .collect()
221    }
222}
223
224impl<L: Language> Default for SharedFindings<L> {
225    fn default() -> Self {
226        Self::new()
227    }
228}
229
230/// BU-side broadcast receiver with local skip set.
231///
232/// The skip set accumulates qualified names received from TD's broadcast
233/// channel. Before analyzing each function, BU calls `drain_and_check()`
234/// which:
235/// 1. Drains any pending broadcast messages into the local skip set
236/// 2. Checks if the given qualified name is in the skip set
237///
238/// This is faster than checking the DashMap for every function because
239/// it avoids hash map lookups for symbols already seen via broadcast.
240pub struct BuReceiver {
241    rx: broadcast::Receiver<String>,
242    skip_set: HashSet<String>,
243}
244
245impl BuReceiver {
246    /// Drain pending broadcast messages and check if a symbol should be skipped.
247    ///
248    /// Returns `true` if the symbol was found in the broadcast skip set
249    /// (meaning TD already flagged it). The caller should ALSO check
250    /// `SharedFindings::has_structural_break()` as a fallback for messages
251    /// that arrived before subscription.
252    pub fn drain_and_check(&mut self, qualified_name: &str) -> bool {
253        // Drain all pending messages from the broadcast channel
254        loop {
255            match self.rx.try_recv() {
256                Ok(name) => {
257                    self.skip_set.insert(name);
258                }
259                Err(broadcast::error::TryRecvError::Empty) => break,
260                Err(broadcast::error::TryRecvError::Closed) => break,
261                Err(broadcast::error::TryRecvError::Lagged(n)) => {
262                    // Channel lagged — some messages were dropped.
263                    // This is fine because we also check the DashMap directly.
264                    tracing::warn!(
265                        lagged_messages = n,
266                        "BU broadcast receiver lagged; falling back to DashMap checks"
267                    );
268                    break;
269                }
270            }
271        }
272
273        self.skip_set.contains(qualified_name)
274    }
275
276    /// Check if a symbol is in the skip set WITHOUT draining.
277    /// Useful for batch checks after an initial drain.
278    pub fn is_skipped(&self, qualified_name: &str) -> bool {
279        self.skip_set.contains(qualified_name)
280    }
281
282    /// Number of symbols in the skip set.
283    pub fn skip_set_size(&self) -> usize {
284        self.skip_set.len()
285    }
286}
287
288/// Helper: check if a symbol should be skipped by BU.
289///
290/// Combines both the broadcast skip set AND the DashMap fallback.
291/// This is the recommended way for BU to check before analyzing a function.
292pub fn should_skip_for_bu<L: Language>(
293    shared: &SharedFindings<L>,
294    receiver: &mut BuReceiver,
295    qualified_name: &str,
296) -> bool {
297    receiver.drain_and_check(qualified_name) || shared.has_structural_break(qualified_name)
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303    use crate::test_support::TestLang;
304    use crate::types::{ChangeSubject, StructuralChangeType, SymbolKind};
305    use std::sync::Arc;
306
307    fn make_structural_change(name: &str) -> StructuralChange {
308        StructuralChange {
309            symbol: name.to_string(),
310            qualified_name: name.to_string(),
311            kind: SymbolKind::Function,
312            package: None,
313            change_type: StructuralChangeType::Removed(ChangeSubject::Symbol {
314                kind: SymbolKind::Function,
315            }),
316            before: None,
317            after: None,
318            description: format!("{} was removed", name),
319            is_breaking: true,
320            impact: None,
321            migration_target: None,
322        }
323    }
324
325    fn make_behavioral_break(name: &str) -> BehavioralBreak<TestLang> {
326        BehavioralBreak {
327            symbol: name.to_string(),
328            caused_by: name.to_string(),
329            call_path: vec![name.to_string()],
330            evidence_description: "TestDelta: test assertions changed".to_string(),
331            confidence: 0.95,
332            description: format!("{} behavior changed", name),
333            category: None,
334            evidence_type: crate::types::EvidenceType::TestDelta,
335            is_internal_only: None,
336        }
337    }
338
339    #[test]
340    fn shared_findings_basic_operations() {
341        let shared: SharedFindings<TestLang> = SharedFindings::new();
342
343        // Initially empty
344        assert_eq!(shared.structural_break_count(), 0);
345        assert_eq!(shared.behavioral_break_count(), 0);
346
347        // Insert structural break
348        shared.insert_structural_break(make_structural_change("foo"));
349        assert_eq!(shared.structural_break_count(), 1);
350        assert!(shared.has_structural_break("foo"));
351        assert!(!shared.has_structural_break("bar"));
352
353        // Insert behavioral break
354        shared.insert_behavioral_break(make_behavioral_break("bar"));
355        assert_eq!(shared.behavioral_break_count(), 1);
356    }
357
358    #[test]
359    fn shared_findings_batch_insert() {
360        let shared: SharedFindings<TestLang> = SharedFindings::new();
361
362        let changes = vec![
363            make_structural_change("a"),
364            make_structural_change("b"),
365            make_structural_change("c"),
366        ];
367        shared.insert_structural_breaks(changes);
368
369        assert_eq!(shared.structural_break_count(), 3);
370        assert!(shared.has_structural_break("a"));
371        assert!(shared.has_structural_break("b"));
372        assert!(shared.has_structural_break("c"));
373    }
374
375    #[test]
376    fn broadcast_receiver_skip_set() {
377        let shared: SharedFindings<TestLang> = SharedFindings::new();
378        let mut receiver = shared.subscribe_to_td();
379
380        // Insert a structural break (also broadcasts)
381        shared.insert_structural_break(make_structural_change("foo"));
382
383        // BU drains and checks
384        assert!(receiver.drain_and_check("foo"));
385        assert!(!receiver.drain_and_check("bar"));
386
387        // After drain, "foo" stays in skip set
388        assert!(receiver.is_skipped("foo"));
389        assert!(!receiver.is_skipped("bar"));
390    }
391
392    #[test]
393    fn broadcast_multiple_messages() {
394        let shared: SharedFindings<TestLang> = SharedFindings::new();
395        let mut receiver = shared.subscribe_to_td();
396
397        // Insert several structural breaks
398        shared.insert_structural_break(make_structural_change("alpha"));
399        shared.insert_structural_break(make_structural_change("beta"));
400        shared.insert_structural_break(make_structural_change("gamma"));
401
402        // First drain picks up all three
403        assert!(receiver.drain_and_check("alpha"));
404        assert!(receiver.is_skipped("beta"));
405        assert!(receiver.is_skipped("gamma"));
406        assert_eq!(receiver.skip_set_size(), 3);
407    }
408
409    #[test]
410    fn should_skip_combines_broadcast_and_dashmap() {
411        let shared: SharedFindings<TestLang> = SharedFindings::new();
412
413        // Insert BEFORE subscribing — won't appear in broadcast
414        shared.insert_structural_break(make_structural_change("early"));
415
416        let mut receiver = shared.subscribe_to_td();
417
418        // Insert AFTER subscribing — will appear in broadcast
419        shared.insert_structural_break(make_structural_change("late"));
420
421        // "early" found via DashMap fallback, "late" via broadcast
422        assert!(should_skip_for_bu(&shared, &mut receiver, "early"));
423        assert!(should_skip_for_bu(&shared, &mut receiver, "late"));
424        assert!(!should_skip_for_bu(&shared, &mut receiver, "unknown"));
425    }
426
427    #[test]
428    fn structural_break_names() {
429        let shared: SharedFindings<TestLang> = SharedFindings::new();
430        shared.insert_structural_break(make_structural_change("x"));
431        shared.insert_structural_break(make_structural_change("y"));
432
433        let mut names = shared.structural_break_names();
434        names.sort();
435        assert_eq!(names, vec!["x", "y"]);
436    }
437
438    #[test]
439    fn surface_try_get_before_set() {
440        let shared: SharedFindings<TestLang> = SharedFindings::new();
441        assert!(shared.try_get_old_surface().is_none());
442        assert!(shared.try_get_new_surface().is_none());
443    }
444
445    #[test]
446    fn surface_set_and_get() {
447        let shared: SharedFindings<TestLang> = SharedFindings::new();
448
449        let surface = Arc::new(ApiSurface { symbols: vec![] });
450        shared.set_old_surface(surface);
451        assert!(shared.try_get_old_surface().is_some());
452        assert_eq!(shared.try_get_old_surface().unwrap().symbols.len(), 0);
453    }
454
455    #[tokio::test]
456    async fn surface_async_get() {
457        let shared: Arc<SharedFindings<TestLang>> = Arc::new(SharedFindings::new());
458
459        let surface = Arc::new(ApiSurface { symbols: vec![] });
460        shared.set_new_surface(surface);
461
462        let result = shared.get_new_surface().await;
463        assert_eq!(result.symbols.len(), 0);
464    }
465
466    #[test]
467    fn concurrent_inserts() {
468        use std::thread;
469
470        let shared: Arc<SharedFindings<TestLang>> = Arc::new(SharedFindings::new());
471        let mut handles = Vec::new();
472
473        // Spawn 10 threads, each inserting 100 structural breaks
474        for t in 0..10 {
475            let shared = shared.clone();
476            handles.push(thread::spawn(move || {
477                for i in 0..100 {
478                    let name = format!("fn_{}_{}", t, i);
479                    shared.insert_structural_break(make_structural_change(&name));
480                }
481            }));
482        }
483
484        for handle in handles {
485            handle.join().unwrap();
486        }
487
488        assert_eq!(shared.structural_break_count(), 1000);
489    }
490}