semver-analyzer-core 0.0.4

Core types, traits, and diff engine for the semver-analyzer
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
//! Concurrent shared state for TD/BU pipeline coordination.
//!
//! `SharedFindings` is the central coordination point between the TD (Top-Down)
//! and BU (Bottom-Up) pipelines running concurrently. It provides:
//!
//! 1. **DashMap** for thread-safe structural and behavioral break storage
//! 2. **Broadcast channel** for real-time TD→BU notifications (avoids
//!    redundant LLM calls on symbols TD has already identified)
//! 3. **OnceCell** for the API surfaces extracted by TD (BU reads these
//!    for visibility resolution)
//!
//! ## Coordination Protocol
//!
//! ```text
//! TD pipeline:                         BU pipeline:
//! 1. Extract API surface at ref A      1. Parse git diff
//! 2. Extract API surface at ref B      2. Extract changed functions
//! 3. diff_surfaces()                   3. For each function:
//! 4. For each structural break:           a. Drain broadcast channel
//!    a. Insert into DashMap               b. Check DashMap + skip set
//!    b. Broadcast qualified_name          c. If not found: analyze
//! ```

use crate::diagnostics::DegradationTracker;
use crate::traits::Language;
use crate::types::{ApiSurface, BehavioralBreak, StructuralChange};
use dashmap::DashMap;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::broadcast;

/// Broadcast channel capacity. Sized for typical project API surfaces.
/// If TD produces more findings than this before BU drains them,
/// older messages are dropped — but BU also checks the DashMap directly,
/// so no findings are lost.
const BROADCAST_CAPACITY: usize = 4096;

/// Concurrent shared state between TD and BU pipelines.
///
/// Generic over `L: Language` so that `BehavioralBreak<L>` carries
/// typed category data instead of stringly-typed labels.
///
/// Thread-safe: all fields use concurrent data structures.
/// Wrapped in `Arc` for sharing between async tasks.
pub struct SharedFindings<L: Language> {
    /// Structural breaks found by TD. Keyed by qualified_name.
    /// TD inserts, BU checks before analyzing each function.
    structural_breaks: DashMap<String, StructuralChange>,

    /// Behavioral breaks found by BU. Keyed by qualified_name.
    /// BU inserts after spec inference confirms a behavioral change.
    behavioral_breaks: DashMap<String, BehavioralBreak<L>>,

    /// Broadcast sender: TD sends qualified names here as it finds
    /// structural breaks. BU subscribes and drains pending messages
    /// into a local skip set before each function analysis.
    ///
    /// This avoids BU redundantly analyzing functions that TD is about
    /// to (or just did) flag structurally. The broadcast is "best effort" —
    /// BU also checks the DashMap directly as a fallback.
    td_broadcast_tx: broadcast::Sender<String>,

    /// API surface from the OLD ref (set by TD after extraction).
    /// Stored as `Arc` so consumers can cheaply share the surface
    /// across concurrent tasks without deep-cloning.
    old_surface: tokio::sync::OnceCell<Arc<ApiSurface<L::SymbolData>>>,

    /// API surface from the NEW ref (set by TD after extraction).
    new_surface: tokio::sync::OnceCell<Arc<ApiSurface<L::SymbolData>>>,

    /// Non-fatal issues that degraded analysis quality.
    /// Accessible to all pipeline phases via `degradation()`.
    degradation: Arc<DegradationTracker>,
}

impl<L: Language> SharedFindings<L> {
    /// Create new shared state with an empty broadcast channel.
    pub fn new() -> Self {
        let (tx, _) = broadcast::channel(BROADCAST_CAPACITY);
        Self {
            structural_breaks: DashMap::new(),
            behavioral_breaks: DashMap::new(),
            td_broadcast_tx: tx,
            old_surface: tokio::sync::OnceCell::new(),
            new_surface: tokio::sync::OnceCell::new(),
            degradation: Arc::new(DegradationTracker::new()),
        }
    }

    // ── TD operations ───────────────────────────────────────────────

    /// Insert a structural break found by TD.
    ///
    /// Also broadcasts the qualified name to BU via the channel.
    /// If the broadcast fails (no receivers yet), that's fine — BU
    /// will check the DashMap directly.
    pub fn insert_structural_break(&self, change: StructuralChange) {
        let name = change.qualified_name.clone();
        self.structural_breaks.insert(name.clone(), change);
        // Best-effort broadcast; ignore SendError (no receivers)
        let _ = self.td_broadcast_tx.send(name);
    }

    /// Insert multiple structural breaks (batch operation after diff_surfaces).
    pub fn insert_structural_breaks(&self, changes: Vec<StructuralChange>) {
        for change in changes {
            self.insert_structural_break(change);
        }
    }

    /// Set the old API surface (called by TD after extraction).
    /// Accepts an `Arc` so the caller can retain a cheap handle.
    pub fn set_old_surface(&self, surface: Arc<ApiSurface<L::SymbolData>>) {
        let _ = self.old_surface.set(surface);
    }

    /// Set the new API surface (called by TD after extraction).
    pub fn set_new_surface(&self, surface: Arc<ApiSurface<L::SymbolData>>) {
        let _ = self.new_surface.set(surface);
    }

    // ── BU operations ───────────────────────────────────────────────

    /// Subscribe to TD's broadcast channel.
    ///
    /// Call this once at the start of BU. Returns a `BuReceiver` that
    /// wraps the broadcast receiver and a local skip set for efficient
    /// repeated checks.
    pub fn subscribe_to_td(&self) -> BuReceiver {
        BuReceiver {
            rx: self.td_broadcast_tx.subscribe(),
            skip_set: HashSet::new(),
        }
    }

    /// Check if TD already found a structural break for this symbol.
    ///
    /// This is the DashMap fallback — always accurate but doesn't
    /// benefit from the broadcast channel's real-time notifications.
    pub fn has_structural_break(&self, qualified_name: &str) -> bool {
        self.structural_breaks.contains_key(qualified_name)
    }

    /// Insert a behavioral break found by BU.
    pub fn insert_behavioral_break(&self, brk: BehavioralBreak<L>) {
        self.behavioral_breaks.insert(brk.symbol.clone(), brk);
    }

    // ── Read operations (post-analysis merge) ───────────────────────

    /// Get all structural breaks (for merge step).
    pub fn structural_breaks(&self) -> &DashMap<String, StructuralChange> {
        &self.structural_breaks
    }

    /// Get all behavioral breaks (for merge step).
    pub fn behavioral_breaks(&self) -> &DashMap<String, BehavioralBreak<L>> {
        &self.behavioral_breaks
    }

    /// Get the old API surface (blocks if TD hasn't set it yet).
    pub async fn get_old_surface(&self) -> &Arc<ApiSurface<L::SymbolData>> {
        self.old_surface
            .get_or_init(|| async { panic!("TD must set old_surface before BU reads it") })
            .await
    }

    /// Get the new API surface (blocks if TD hasn't set it yet).
    pub async fn get_new_surface(&self) -> &Arc<ApiSurface<L::SymbolData>> {
        self.new_surface
            .get_or_init(|| async { panic!("TD must set new_surface before BU reads it") })
            .await
    }

    /// Try to get the old surface without blocking (returns None if not set yet).
    /// Returns a reference to the `Arc` — clone the Arc (cheap) if you need
    /// ownership, or borrow through it with `&**arc`.
    pub fn try_get_old_surface(&self) -> Option<&Arc<ApiSurface<L::SymbolData>>> {
        self.old_surface.get()
    }

    /// Try to get the new surface without blocking (returns None if not set yet).
    pub fn try_get_new_surface(&self) -> Option<&Arc<ApiSurface<L::SymbolData>>> {
        self.new_surface.get()
    }

    // ── Degradation tracking ────────────────────────────────────────

    /// Access the degradation tracker to record or query non-fatal issues.
    ///
    /// Available to all pipeline phases and Language implementations.
    pub fn degradation(&self) -> &DegradationTracker {
        &self.degradation
    }

    /// Get a clone of the Arc-wrapped degradation tracker.
    ///
    /// Useful when you need to pass the tracker to a spawned task.
    pub fn degradation_arc(&self) -> Arc<DegradationTracker> {
        self.degradation.clone()
    }

    // ── Read operations (post-analysis merge) ───────────────────────

    /// Count of structural breaks found so far.
    pub fn structural_break_count(&self) -> usize {
        self.structural_breaks.len()
    }

    /// Count of behavioral breaks found so far.
    pub fn behavioral_break_count(&self) -> usize {
        self.behavioral_breaks.len()
    }

    /// Get all structural break qualified names (for reconciliation).
    pub fn structural_break_names(&self) -> Vec<String> {
        self.structural_breaks
            .iter()
            .map(|entry| entry.key().clone())
            .collect()
    }
}

impl<L: Language> Default for SharedFindings<L> {
    fn default() -> Self {
        Self::new()
    }
}

/// BU-side broadcast receiver with local skip set.
///
/// The skip set accumulates qualified names received from TD's broadcast
/// channel. Before analyzing each function, BU calls `drain_and_check()`
/// which:
/// 1. Drains any pending broadcast messages into the local skip set
/// 2. Checks if the given qualified name is in the skip set
///
/// This is faster than checking the DashMap for every function because
/// it avoids hash map lookups for symbols already seen via broadcast.
pub struct BuReceiver {
    rx: broadcast::Receiver<String>,
    skip_set: HashSet<String>,
}

impl BuReceiver {
    /// Drain pending broadcast messages and check if a symbol should be skipped.
    ///
    /// Returns `true` if the symbol was found in the broadcast skip set
    /// (meaning TD already flagged it). The caller should ALSO check
    /// `SharedFindings::has_structural_break()` as a fallback for messages
    /// that arrived before subscription.
    pub fn drain_and_check(&mut self, qualified_name: &str) -> bool {
        // Drain all pending messages from the broadcast channel
        loop {
            match self.rx.try_recv() {
                Ok(name) => {
                    self.skip_set.insert(name);
                }
                Err(broadcast::error::TryRecvError::Empty) => break,
                Err(broadcast::error::TryRecvError::Closed) => break,
                Err(broadcast::error::TryRecvError::Lagged(n)) => {
                    // Channel lagged — some messages were dropped.
                    // This is fine because we also check the DashMap directly.
                    tracing::warn!(
                        lagged_messages = n,
                        "BU broadcast receiver lagged; falling back to DashMap checks"
                    );
                    break;
                }
            }
        }

        self.skip_set.contains(qualified_name)
    }

    /// Check if a symbol is in the skip set WITHOUT draining.
    /// Useful for batch checks after an initial drain.
    pub fn is_skipped(&self, qualified_name: &str) -> bool {
        self.skip_set.contains(qualified_name)
    }

    /// Number of symbols in the skip set.
    pub fn skip_set_size(&self) -> usize {
        self.skip_set.len()
    }
}

/// Helper: check if a symbol should be skipped by BU.
///
/// Combines both the broadcast skip set AND the DashMap fallback.
/// This is the recommended way for BU to check before analyzing a function.
pub fn should_skip_for_bu<L: Language>(
    shared: &SharedFindings<L>,
    receiver: &mut BuReceiver,
    qualified_name: &str,
) -> bool {
    receiver.drain_and_check(qualified_name) || shared.has_structural_break(qualified_name)
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::test_support::TestLang;
    use crate::types::{ChangeSubject, StructuralChangeType, SymbolKind};
    use std::sync::Arc;

    fn make_structural_change(name: &str) -> StructuralChange {
        StructuralChange {
            symbol: name.to_string(),
            qualified_name: name.to_string(),
            kind: SymbolKind::Function,
            package: None,
            change_type: StructuralChangeType::Removed(ChangeSubject::Symbol {
                kind: SymbolKind::Function,
            }),
            before: None,
            after: None,
            description: format!("{} was removed", name),
            is_breaking: true,
            impact: None,
            migration_target: None,
        }
    }

    fn make_behavioral_break(name: &str) -> BehavioralBreak<TestLang> {
        BehavioralBreak {
            symbol: name.to_string(),
            caused_by: name.to_string(),
            call_path: vec![name.to_string()],
            evidence_description: "TestDelta: test assertions changed".to_string(),
            confidence: 0.95,
            description: format!("{} behavior changed", name),
            category: None,
            evidence_type: crate::types::EvidenceType::TestDelta,
            is_internal_only: None,
        }
    }

    #[test]
    fn shared_findings_basic_operations() {
        let shared: SharedFindings<TestLang> = SharedFindings::new();

        // Initially empty
        assert_eq!(shared.structural_break_count(), 0);
        assert_eq!(shared.behavioral_break_count(), 0);

        // Insert structural break
        shared.insert_structural_break(make_structural_change("foo"));
        assert_eq!(shared.structural_break_count(), 1);
        assert!(shared.has_structural_break("foo"));
        assert!(!shared.has_structural_break("bar"));

        // Insert behavioral break
        shared.insert_behavioral_break(make_behavioral_break("bar"));
        assert_eq!(shared.behavioral_break_count(), 1);
    }

    #[test]
    fn shared_findings_batch_insert() {
        let shared: SharedFindings<TestLang> = SharedFindings::new();

        let changes = vec![
            make_structural_change("a"),
            make_structural_change("b"),
            make_structural_change("c"),
        ];
        shared.insert_structural_breaks(changes);

        assert_eq!(shared.structural_break_count(), 3);
        assert!(shared.has_structural_break("a"));
        assert!(shared.has_structural_break("b"));
        assert!(shared.has_structural_break("c"));
    }

    #[test]
    fn broadcast_receiver_skip_set() {
        let shared: SharedFindings<TestLang> = SharedFindings::new();
        let mut receiver = shared.subscribe_to_td();

        // Insert a structural break (also broadcasts)
        shared.insert_structural_break(make_structural_change("foo"));

        // BU drains and checks
        assert!(receiver.drain_and_check("foo"));
        assert!(!receiver.drain_and_check("bar"));

        // After drain, "foo" stays in skip set
        assert!(receiver.is_skipped("foo"));
        assert!(!receiver.is_skipped("bar"));
    }

    #[test]
    fn broadcast_multiple_messages() {
        let shared: SharedFindings<TestLang> = SharedFindings::new();
        let mut receiver = shared.subscribe_to_td();

        // Insert several structural breaks
        shared.insert_structural_break(make_structural_change("alpha"));
        shared.insert_structural_break(make_structural_change("beta"));
        shared.insert_structural_break(make_structural_change("gamma"));

        // First drain picks up all three
        assert!(receiver.drain_and_check("alpha"));
        assert!(receiver.is_skipped("beta"));
        assert!(receiver.is_skipped("gamma"));
        assert_eq!(receiver.skip_set_size(), 3);
    }

    #[test]
    fn should_skip_combines_broadcast_and_dashmap() {
        let shared: SharedFindings<TestLang> = SharedFindings::new();

        // Insert BEFORE subscribing — won't appear in broadcast
        shared.insert_structural_break(make_structural_change("early"));

        let mut receiver = shared.subscribe_to_td();

        // Insert AFTER subscribing — will appear in broadcast
        shared.insert_structural_break(make_structural_change("late"));

        // "early" found via DashMap fallback, "late" via broadcast
        assert!(should_skip_for_bu(&shared, &mut receiver, "early"));
        assert!(should_skip_for_bu(&shared, &mut receiver, "late"));
        assert!(!should_skip_for_bu(&shared, &mut receiver, "unknown"));
    }

    #[test]
    fn structural_break_names() {
        let shared: SharedFindings<TestLang> = SharedFindings::new();
        shared.insert_structural_break(make_structural_change("x"));
        shared.insert_structural_break(make_structural_change("y"));

        let mut names = shared.structural_break_names();
        names.sort();
        assert_eq!(names, vec!["x", "y"]);
    }

    #[test]
    fn surface_try_get_before_set() {
        let shared: SharedFindings<TestLang> = SharedFindings::new();
        assert!(shared.try_get_old_surface().is_none());
        assert!(shared.try_get_new_surface().is_none());
    }

    #[test]
    fn surface_set_and_get() {
        let shared: SharedFindings<TestLang> = SharedFindings::new();

        let surface = Arc::new(ApiSurface { symbols: vec![] });
        shared.set_old_surface(surface);
        assert!(shared.try_get_old_surface().is_some());
        assert_eq!(shared.try_get_old_surface().unwrap().symbols.len(), 0);
    }

    #[tokio::test]
    async fn surface_async_get() {
        let shared: Arc<SharedFindings<TestLang>> = Arc::new(SharedFindings::new());

        let surface = Arc::new(ApiSurface { symbols: vec![] });
        shared.set_new_surface(surface);

        let result = shared.get_new_surface().await;
        assert_eq!(result.symbols.len(), 0);
    }

    #[test]
    fn concurrent_inserts() {
        use std::thread;

        let shared: Arc<SharedFindings<TestLang>> = Arc::new(SharedFindings::new());
        let mut handles = Vec::new();

        // Spawn 10 threads, each inserting 100 structural breaks
        for t in 0..10 {
            let shared = shared.clone();
            handles.push(thread::spawn(move || {
                for i in 0..100 {
                    let name = format!("fn_{}_{}", t, i);
                    shared.insert_structural_break(make_structural_change(&name));
                }
            }));
        }

        for handle in handles {
            handle.join().unwrap();
        }

        assert_eq!(shared.structural_break_count(), 1000);
    }
}