Skip to main content

shiplog_merge/
lib.rs

1//! Merging utilities for combining multiple event sources.
2//!
3//! Provides functions to merge and deduplicate events from multiple sources,
4//! handling conflicts and preserving coverage metadata when operating on
5//! `IngestOutput` values.
6
7use anyhow::{Result, anyhow};
8use chrono::Utc;
9use shiplog_ids::EventId;
10use shiplog_ids::RunId;
11use shiplog_ports::IngestOutput;
12use shiplog_schema::coverage::{Completeness, CoverageManifest, CoverageSlice};
13use shiplog_schema::event::EventEnvelope;
14use std::collections::HashMap;
15
16/// Strategy for handling duplicate events during merge.
17#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
18pub enum ConflictResolution {
19    /// Keep the first event encountered.
20    PreferFirst,
21    /// Keep the latest event.
22    #[default]
23    PreferMostRecent,
24    /// Keep the event with more complete data.
25    PreferMostComplete,
26}
27
28/// Legacy strategy type retained for existing callers.
29#[derive(Clone, Debug, Default)]
30pub enum MergeStrategy {
31    /// Keep the first event encountered.
32    KeepFirst,
33    /// Keep the last event encountered (by occurred_at).
34    #[default]
35    KeepLast,
36    /// Keep the event with more complete data.
37    KeepMostComplete,
38}
39
40impl From<ConflictResolution> for MergeStrategy {
41    fn from(value: ConflictResolution) -> Self {
42        match value {
43            ConflictResolution::PreferFirst => Self::KeepFirst,
44            ConflictResolution::PreferMostRecent => Self::KeepLast,
45            ConflictResolution::PreferMostComplete => Self::KeepMostComplete,
46        }
47    }
48}
49
50/// Metadata from an ingest-output merge.
51#[derive(Debug, Clone)]
52pub struct MergeReport {
53    pub source_count: usize,
54    pub input_event_count: usize,
55    pub output_event_count: usize,
56    pub conflict_count: usize,
57    pub skipped_events: usize,
58    pub warning_count: usize,
59}
60
61/// Merge result that keeps output data and summary metadata together.
62#[derive(Debug, Clone)]
63pub struct MergeResult {
64    pub ingest_output: IngestOutput,
65    pub report: MergeReport,
66}
67
68/// Merge multiple event lists into one, deduplicating by event ID.
69///
70/// The strategy determines how to handle conflicts when the same event appears
71/// in multiple sources.
72pub fn merge_events(
73    sources: Vec<Vec<EventEnvelope>>,
74    strategy: &MergeStrategy,
75) -> Vec<EventEnvelope> {
76    let mut events_by_id: HashMap<EventId, EventEnvelope> = HashMap::new();
77
78    for source in sources {
79        for event in source {
80            match events_by_id.get(&event.id) {
81                Some(existing) => {
82                    let should_replace = match strategy {
83                        MergeStrategy::KeepFirst => false,
84                        MergeStrategy::KeepLast => event.occurred_at > existing.occurred_at,
85                        MergeStrategy::KeepMostComplete => {
86                            completeness_score(&event) > completeness_score(existing)
87                        }
88                    };
89                    if should_replace {
90                        events_by_id.insert(event.id.clone(), event);
91                    }
92                }
93                None => {
94                    events_by_id.insert(event.id.clone(), event);
95                }
96            }
97        }
98    }
99
100    let mut result: Vec<EventEnvelope> = events_by_id.into_values().collect();
101    result.sort_by(|a, b| {
102        a.occurred_at
103            .cmp(&b.occurred_at)
104            .then_with(|| a.id.0.cmp(&b.id.0))
105    });
106    result
107}
108
109/// Merge two event lists.
110pub fn merge_two(
111    left: &[EventEnvelope],
112    right: &[EventEnvelope],
113    strategy: &MergeStrategy,
114) -> Vec<EventEnvelope> {
115    merge_events(vec![left.to_vec(), right.to_vec()], strategy)
116}
117
118/// Merge complete ingest outputs from multiple sources.
119pub fn merge_ingest_outputs(
120    ingest_outputs: &[IngestOutput],
121    resolution: ConflictResolution,
122) -> Result<MergeResult> {
123    if ingest_outputs.is_empty() {
124        return Err(anyhow!("No ingest outputs to merge"));
125    }
126
127    let base_coverage = &ingest_outputs[0].coverage;
128    let mut event_groups = Vec::with_capacity(ingest_outputs.len());
129    let mut all_sources = Vec::new();
130    let mut all_warnings = Vec::new();
131    let mut all_slices: Vec<CoverageSlice> = Vec::new();
132    let mut input_event_count = 0usize;
133
134    let mut all_freshness = Vec::new();
135    for ingest in ingest_outputs {
136        input_event_count += ingest.events.len();
137        event_groups.push(ingest.events.clone());
138        all_sources.extend(ingest.coverage.sources.clone());
139        all_warnings.extend(ingest.coverage.warnings.clone());
140        all_slices.extend(ingest.coverage.slices.clone());
141        all_freshness.extend(ingest.freshness.clone());
142    }
143
144    let merged_events = merge_events(event_groups, &resolution.into());
145    let mut coverage = CoverageManifest {
146        run_id: RunId::now("merge"),
147        generated_at: Utc::now(),
148        user: base_coverage.user.clone(),
149        window: base_coverage.window.clone(),
150        mode: "merged".to_string(),
151        sources: {
152            all_sources.sort();
153            all_sources.dedup();
154            all_sources
155        },
156        slices: all_slices,
157        warnings: {
158            if input_event_count > merged_events.len() {
159                let conflicts = input_event_count - merged_events.len();
160                all_warnings.push(format!(
161                    "Resolved {} conflict(s) during merge using {:?} strategy",
162                    conflicts, resolution,
163                ));
164            }
165            all_warnings
166        },
167        completeness: if ingest_outputs
168            .iter()
169            .any(|o| o.coverage.completeness == Completeness::Partial)
170        {
171            Completeness::Partial
172        } else {
173            Completeness::Complete
174        },
175    };
176
177    let conflict_count = input_event_count.saturating_sub(merged_events.len());
178    coverage.slices.sort_by_key(|slice| slice.window.since);
179
180    let report = MergeReport {
181        source_count: coverage.sources.len(),
182        input_event_count,
183        output_event_count: merged_events.len(),
184        conflict_count,
185        skipped_events: 0,
186        warning_count: coverage.warnings.len(),
187    };
188
189    Ok(MergeResult {
190        ingest_output: IngestOutput {
191            events: merged_events,
192            coverage,
193            freshness: all_freshness,
194        },
195        report,
196    })
197}
198
199/// Merge multiple ingest outputs using the pre-existing engine fallback behavior.
200///
201/// Kept here to preserve CLI compatibility when the `merge-pipeline` feature
202/// is disabled in `shiplog-engine`.
203pub fn merge_ingest_outputs_legacy(
204    ingest_outputs: &[IngestOutput],
205    resolution: ConflictResolution,
206) -> Result<IngestOutput> {
207    use std::collections::HashMap;
208
209    if ingest_outputs.is_empty() {
210        return Err(anyhow!("No ingest outputs to merge"));
211    }
212
213    let mut event_map: HashMap<String, Vec<EventEnvelope>> = HashMap::new();
214    let mut all_sources: Vec<String> = Vec::new();
215    let mut all_warnings: Vec<String> = Vec::new();
216    let mut all_slices: Vec<shiplog_schema::coverage::CoverageSlice> = Vec::new();
217    let mut all_freshness: Vec<shiplog_schema::freshness::SourceFreshness> = Vec::new();
218
219    let base_output = &ingest_outputs[0];
220    let window = base_output.coverage.window.clone();
221    let user = base_output.coverage.user.clone();
222
223    for ingest in ingest_outputs {
224        for event in &ingest.events {
225            let id = event.id.to_string();
226            event_map.entry(id).or_default().push(event.clone());
227        }
228
229        all_sources.extend(ingest.coverage.sources.clone());
230        all_warnings.extend(ingest.coverage.warnings.clone());
231        all_slices.extend(ingest.coverage.slices.clone());
232        all_freshness.extend(ingest.freshness.clone());
233    }
234
235    let mut merged_events: Vec<EventEnvelope> = Vec::new();
236    let mut conflict_count = 0usize;
237
238    for (_id, events) in event_map {
239        if events.len() == 1 {
240            merged_events.push(events[0].clone());
241        } else {
242            conflict_count += 1;
243            merged_events.push(resolve_conflict_legacy(&events, resolution));
244        }
245    }
246
247    merged_events.sort_by(|a, b| {
248        a.occurred_at
249            .cmp(&b.occurred_at)
250            .then_with(|| a.id.0.cmp(&b.id.0))
251    });
252    all_sources.sort();
253    all_sources.dedup();
254
255    let completeness = if ingest_outputs
256        .iter()
257        .any(|o| o.coverage.completeness == shiplog_schema::coverage::Completeness::Partial)
258    {
259        shiplog_schema::coverage::Completeness::Partial
260    } else {
261        shiplog_schema::coverage::Completeness::Complete
262    };
263
264    if conflict_count > 0 {
265        all_warnings.push(format!(
266            "Resolved {} conflict(s) during merge using {:?} strategy",
267            conflict_count, resolution,
268        ));
269    }
270
271    let coverage = shiplog_schema::coverage::CoverageManifest {
272        run_id: RunId::now("merge"),
273        generated_at: chrono::Utc::now(),
274        user,
275        window,
276        mode: "merged".to_string(),
277        sources: all_sources,
278        slices: all_slices,
279        warnings: all_warnings,
280        completeness,
281    };
282
283    Ok(IngestOutput {
284        events: merged_events,
285        coverage,
286        freshness: all_freshness,
287    })
288}
289
290fn resolve_conflict_legacy(
291    events: &[EventEnvelope],
292    resolution: ConflictResolution,
293) -> EventEnvelope {
294    match resolution {
295        ConflictResolution::PreferFirst => events[0].clone(),
296        ConflictResolution::PreferMostRecent => events
297            .iter()
298            .max_by_key(|e| e.occurred_at)
299            .cloned()
300            .unwrap(),
301        ConflictResolution::PreferMostComplete => events
302            .iter()
303            .max_by_key(|e| completeness_score_legacy(e))
304            .cloned()
305            .unwrap(),
306    }
307}
308
309fn completeness_score_legacy(event: &EventEnvelope) -> usize {
310    let mut score = 0;
311
312    // Check for non-empty fields
313    if !event.actor.login.is_empty() {
314        score += 1;
315    }
316    if event.actor.id.is_some() {
317        score += 1;
318    }
319    if !event.repo.full_name.is_empty() {
320        score += 1;
321    }
322    if event.repo.html_url.is_some() {
323        score += 1;
324    }
325    if !event.tags.is_empty() {
326        score += 1;
327    }
328    if !event.links.is_empty() {
329        score += 1;
330    }
331    if event.source.url.is_some() {
332        score += 1;
333    }
334    if event.source.opaque_id.is_some() {
335        score += 1;
336    }
337
338    // Check payload completeness
339    match &event.payload {
340        shiplog_schema::event::EventPayload::PullRequest(pr) => {
341            if pr.additions.is_some() {
342                score += 1;
343            }
344            if pr.deletions.is_some() {
345                score += 1;
346            }
347            if pr.changed_files.is_some() {
348                score += 1;
349            }
350            if pr.merged_at.is_some() {
351                score += 1;
352            }
353        }
354        shiplog_schema::event::EventPayload::Manual(manual) => {
355            if manual.description.is_some() {
356                score += 1;
357            }
358            if manual.started_at.is_some() {
359                score += 1;
360            }
361            if manual.ended_at.is_some() {
362                score += 1;
363            }
364            if manual.impact.is_some() {
365                score += 1;
366            }
367        }
368        _ => {}
369    }
370
371    score
372}
373
374/// Calculate a completeness score for an event (higher = more complete).
375fn completeness_score(event: &EventEnvelope) -> u32 {
376    let mut score = 0;
377
378    // Check payload completeness
379    match &event.payload {
380        shiplog_schema::event::EventPayload::PullRequest(pr) => {
381            score += 10;
382            if pr.additions.is_some() {
383                score += 1;
384            }
385            if pr.deletions.is_some() {
386                score += 1;
387            }
388            if pr.changed_files.is_some() {
389                score += 1;
390            }
391            if !pr.touched_paths_hint.is_empty() {
392                score += 1;
393            }
394        }
395        shiplog_schema::event::EventPayload::Review(r) => {
396            score += 8;
397            if !r.pull_title.is_empty() {
398                score += 1;
399            }
400        }
401        shiplog_schema::event::EventPayload::Manual(m) => {
402            score += 5;
403            if m.description.is_some() {
404                score += 2;
405            }
406            if m.impact.is_some() {
407                score += 2;
408            }
409        }
410    }
411
412    // Check source completeness
413    if event.source.url.is_some() {
414        score += 1;
415    }
416    if event.source.opaque_id.is_some() {
417        score += 1;
418    }
419
420    // Check links
421    if !event.links.is_empty() {
422        score += 2;
423    }
424
425    // Check tags
426    if !event.tags.is_empty() {
427        score += 1;
428    }
429
430    score
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436    use chrono::{NaiveDate, TimeZone, Utc};
437    use shiplog_ids::EventId;
438    use shiplog_schema::coverage::{CoverageManifest, CoverageSlice, TimeWindow};
439    use shiplog_schema::event::{
440        Actor, EventKind, EventPayload, ManualEvent, ManualEventType, RepoRef, RepoVisibility,
441        SourceRef, SourceSystem,
442    };
443    fn make_event(id: &str, occurred_at: chrono::DateTime<chrono::Utc>) -> EventEnvelope {
444        EventEnvelope {
445            id: EventId::from_parts([id]),
446            kind: EventKind::Manual,
447            occurred_at,
448            actor: Actor {
449                login: "testuser".to_string(),
450                id: Some(123),
451            },
452            repo: RepoRef {
453                full_name: "owner/test".to_string(),
454                html_url: Some("https://github.com/owner/test".to_string()),
455                visibility: RepoVisibility::Public,
456            },
457            payload: EventPayload::Manual(ManualEvent {
458                event_type: ManualEventType::Note,
459                title: "Test event".to_string(),
460                description: None,
461                started_at: None,
462                ended_at: None,
463                impact: None,
464            }),
465            tags: vec![],
466            links: vec![],
467            source: SourceRef {
468                system: SourceSystem::Manual,
469                url: None,
470                opaque_id: None,
471            },
472        }
473    }
474
475    fn make_event_with_tags(
476        id: &str,
477        occurred_at: chrono::DateTime<chrono::Utc>,
478        tags: Vec<String>,
479    ) -> EventEnvelope {
480        let mut e = make_event(id, occurred_at);
481        e.tags = tags;
482        e
483    }
484
485    fn coverage(
486        w: usize,
487        completeness: Completeness,
488        source: &str,
489        warning: &str,
490    ) -> CoverageManifest {
491        CoverageManifest {
492            run_id: RunId::now("test"),
493            generated_at: Utc.timestamp_nanos(1),
494            user: "tester".to_string(),
495            window: TimeWindow {
496                since: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
497                until: NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
498            },
499            mode: "merged".to_string(),
500            sources: vec![source.to_string()],
501            slices: vec![CoverageSlice {
502                window: TimeWindow {
503                    since: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
504                    until: NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
505                },
506                query: "q".to_string(),
507                total_count: w as u64,
508                fetched: w as u64,
509                incomplete_results: None,
510                notes: vec![],
511            }],
512            warnings: vec![warning.to_string()],
513            completeness,
514        }
515    }
516
517    #[test]
518    fn merge_empty_sources() {
519        let result = merge_events(vec![], &MergeStrategy::default());
520        assert!(result.is_empty());
521    }
522
523    #[test]
524    fn merge_single_source() {
525        let events = vec![
526            make_event("1", Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap()),
527            make_event("2", Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap()),
528        ];
529        let result = merge_events(vec![events], &MergeStrategy::default());
530        assert_eq!(result.len(), 2);
531    }
532
533    #[test]
534    fn merge_deduplicates_by_id() {
535        let event1 = make_event("1", Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap());
536        let event2 = make_event("1", Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap());
537
538        let result = merge_events(
539            vec![vec![event1.clone()], vec![event2.clone()]],
540            &MergeStrategy::KeepLast,
541        );
542        assert_eq!(result.len(), 1);
543        assert_eq!(result[0].occurred_at, event2.occurred_at);
544    }
545
546    #[test]
547    fn merge_keeps_first_strategy() {
548        let event1 = make_event("1", Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap());
549        let event2 = make_event("1", Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap());
550
551        let result = merge_events(
552            vec![vec![event1.clone()], vec![event2]],
553            &MergeStrategy::KeepFirst,
554        );
555        assert_eq!(result.len(), 1);
556        assert_eq!(result[0].occurred_at, event1.occurred_at);
557    }
558
559    #[test]
560    fn merge_keeps_last_strategy() {
561        let event1 = make_event("1", Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap());
562        let event2 = make_event("1", Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap());
563
564        let result = merge_events(
565            vec![vec![event1], vec![event2.clone()]],
566            &MergeStrategy::KeepLast,
567        );
568        assert_eq!(result.len(), 1);
569        assert_eq!(result[0].occurred_at, event2.occurred_at);
570    }
571
572    #[test]
573    fn merge_two_helper() {
574        let left = vec![make_event(
575            "1",
576            Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(),
577        )];
578        let right = vec![make_event(
579            "2",
580            Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap(),
581        )];
582
583        let result = merge_two(&left, &right, &MergeStrategy::default());
584        assert_eq!(result.len(), 2);
585    }
586
587    #[test]
588    fn merge_result_is_sorted() {
589        let events = vec![
590            make_event("a", Utc.with_ymd_and_hms(2025, 1, 3, 0, 0, 0).unwrap()),
591            make_event("b", Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap()),
592            make_event("c", Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap()),
593        ];
594
595        let result = merge_events(vec![events], &MergeStrategy::default());
596
597        // Should be sorted by occurred_at (Jan 1, Jan 2, Jan 3)
598        assert_eq!(result.len(), 3);
599        assert!(result[0].occurred_at <= result[1].occurred_at);
600        assert!(result[1].occurred_at <= result[2].occurred_at);
601    }
602
603    #[test]
604    fn merge_ingest_outputs_unifies_coverage_and_events() {
605        let ingest_a = IngestOutput {
606            events: vec![
607                make_event("a", Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap()),
608                make_event("b", Utc.with_ymd_and_hms(2025, 1, 1, 1, 0, 0).unwrap()),
609            ],
610            coverage: coverage(2, Completeness::Partial, "github", "a.warning"),
611            freshness: Vec::new(),
612        };
613        let ingest_b = IngestOutput {
614            events: vec![
615                make_event("b", Utc.with_ymd_and_hms(2025, 1, 1, 2, 0, 0).unwrap()),
616                make_event("c", Utc.with_ymd_and_hms(2025, 1, 1, 3, 0, 0).unwrap()),
617            ],
618            coverage: coverage(2, Completeness::Complete, "local_git", "b.warning"),
619            freshness: Vec::new(),
620        };
621
622        let merged =
623            merge_ingest_outputs(&[ingest_a, ingest_b], ConflictResolution::PreferMostRecent)
624                .unwrap();
625
626        assert_eq!(merged.ingest_output.events.len(), 3);
627        assert_eq!(merged.report.conflict_count, 1);
628        assert_eq!(merged.ingest_output.coverage.sources.len(), 2);
629        assert_eq!(merged.ingest_output.coverage.warnings.len(), 3);
630        assert_eq!(
631            merged.ingest_output.coverage.completeness,
632            Completeness::Partial
633        );
634        assert_eq!(merged.ingest_output.coverage.mode, "merged");
635    }
636
637    #[test]
638    fn merge_ingest_outputs_rejects_empty_input() {
639        let err = merge_ingest_outputs(&[], ConflictResolution::PreferMostRecent)
640            .expect_err("expected empty input error");
641        assert!(
642            err.to_string().contains("No ingest outputs to merge"),
643            "{err}"
644        );
645    }
646
647    // --- Edge-case tests ---
648
649    #[test]
650    fn merge_multiple_sources_no_overlap() {
651        let t1 = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
652        let t2 = Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap();
653        let t3 = Utc.with_ymd_and_hms(2025, 1, 3, 0, 0, 0).unwrap();
654        let result = merge_events(
655            vec![
656                vec![make_event("a", t1)],
657                vec![make_event("b", t2)],
658                vec![make_event("c", t3)],
659            ],
660            &MergeStrategy::default(),
661        );
662        assert_eq!(result.len(), 3);
663        assert!(result[0].occurred_at <= result[1].occurred_at);
664        assert!(result[1].occurred_at <= result[2].occurred_at);
665    }
666
667    #[test]
668    fn merge_all_duplicates() {
669        let t = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
670        let e = make_event("same", t);
671        let result = merge_events(
672            vec![vec![e.clone()], vec![e.clone()], vec![e.clone()]],
673            &MergeStrategy::KeepFirst,
674        );
675        assert_eq!(result.len(), 1);
676    }
677
678    #[test]
679    fn merge_keeps_most_complete_strategy() {
680        let t = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
681        let sparse = make_event("x", t);
682        let rich = make_event_with_tags("x", t, vec!["tag1".into(), "tag2".into()]);
683        let result = merge_events(
684            vec![vec![sparse], vec![rich.clone()]],
685            &MergeStrategy::KeepMostComplete,
686        );
687        assert_eq!(result.len(), 1);
688        assert_eq!(result[0].tags, rich.tags);
689    }
690
691    #[test]
692    fn merge_preserves_order_same_timestamp() {
693        let t = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
694        let events = vec![make_event("z", t), make_event("a", t), make_event("m", t)];
695        let result = merge_events(vec![events], &MergeStrategy::default());
696        assert_eq!(result.len(), 3);
697        // With same timestamp, sorted by id hash
698        for w in result.windows(2) {
699            assert!(w[0].occurred_at <= w[1].occurred_at);
700            if w[0].occurred_at == w[1].occurred_at {
701                assert!(w[0].id.0 <= w[1].id.0);
702            }
703        }
704    }
705
706    #[test]
707    fn merge_single_event_per_source() {
708        let t1 = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
709        let t2 = Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap();
710        let result = merge_events(
711            vec![vec![make_event("a", t1)], vec![make_event("b", t2)]],
712            &MergeStrategy::default(),
713        );
714        assert_eq!(result.len(), 2);
715    }
716
717    #[test]
718    fn merge_two_empty_lists() {
719        let result = merge_two(&[], &[], &MergeStrategy::default());
720        assert!(result.is_empty());
721    }
722
723    #[test]
724    fn merge_two_one_empty() {
725        let t = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
726        let events = vec![make_event("a", t)];
727        let result = merge_two(&events, &[], &MergeStrategy::default());
728        assert_eq!(result.len(), 1);
729    }
730
731    #[test]
732    fn merge_ingest_outputs_single_source() {
733        let ingest = IngestOutput {
734            events: vec![make_event(
735                "a",
736                Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(),
737            )],
738            coverage: coverage(1, Completeness::Complete, "github", ""),
739            freshness: Vec::new(),
740        };
741        let merged = merge_ingest_outputs(&[ingest], ConflictResolution::PreferFirst).unwrap();
742        assert_eq!(merged.ingest_output.events.len(), 1);
743        assert_eq!(merged.report.conflict_count, 0);
744    }
745
746    #[test]
747    fn merge_ingest_outputs_all_complete() {
748        let ingest_a = IngestOutput {
749            events: vec![make_event(
750                "a",
751                Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(),
752            )],
753            coverage: coverage(1, Completeness::Complete, "github", ""),
754            freshness: Vec::new(),
755        };
756        let ingest_b = IngestOutput {
757            events: vec![make_event(
758                "b",
759                Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap(),
760            )],
761            coverage: coverage(1, Completeness::Complete, "local_git", ""),
762            freshness: Vec::new(),
763        };
764        let merged =
765            merge_ingest_outputs(&[ingest_a, ingest_b], ConflictResolution::PreferFirst).unwrap();
766        assert_eq!(
767            merged.ingest_output.coverage.completeness,
768            Completeness::Complete
769        );
770    }
771
772    #[test]
773    fn merge_legacy_rejects_empty_input() {
774        let err = merge_ingest_outputs_legacy(&[], ConflictResolution::PreferFirst)
775            .expect_err("expected error");
776        assert!(err.to_string().contains("No ingest outputs to merge"));
777    }
778
779    #[test]
780    fn merge_legacy_deduplicates() {
781        let t = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
782        let ingest = IngestOutput {
783            events: vec![make_event("a", t)],
784            coverage: coverage(1, Completeness::Complete, "github", ""),
785            freshness: Vec::new(),
786        };
787        let merged =
788            merge_ingest_outputs_legacy(&[ingest.clone(), ingest], ConflictResolution::PreferFirst)
789                .unwrap();
790        assert_eq!(merged.events.len(), 1);
791    }
792
793    #[test]
794    fn conflict_resolution_to_merge_strategy_mapping() {
795        let s: MergeStrategy = ConflictResolution::PreferFirst.into();
796        assert!(matches!(s, MergeStrategy::KeepFirst));
797        let s: MergeStrategy = ConflictResolution::PreferMostRecent.into();
798        assert!(matches!(s, MergeStrategy::KeepLast));
799        let s: MergeStrategy = ConflictResolution::PreferMostComplete.into();
800        assert!(matches!(s, MergeStrategy::KeepMostComplete));
801    }
802
803    // --- Snapshot tests ---
804
805    #[test]
806    fn snapshot_merge_report() {
807        let t1 = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
808        let t2 = Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap();
809        let ingest_a = IngestOutput {
810            events: vec![make_event("a", t1), make_event("shared", t1)],
811            coverage: coverage(2, Completeness::Complete, "github", "warn-a"),
812            freshness: Vec::new(),
813        };
814        let ingest_b = IngestOutput {
815            events: vec![make_event("shared", t2), make_event("b", t2)],
816            coverage: coverage(2, Completeness::Complete, "local_git", "warn-b"),
817            freshness: Vec::new(),
818        };
819        let merged =
820            merge_ingest_outputs(&[ingest_a, ingest_b], ConflictResolution::PreferMostRecent)
821                .unwrap();
822        insta::assert_debug_snapshot!(merged.report);
823    }
824
825    #[test]
826    fn snapshot_merged_event_ids() {
827        let t1 = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
828        let t2 = Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap();
829        let t3 = Utc.with_ymd_and_hms(2025, 1, 3, 0, 0, 0).unwrap();
830        let result = merge_events(
831            vec![
832                vec![make_event("alpha", t1), make_event("beta", t2)],
833                vec![make_event("beta", t2), make_event("gamma", t3)],
834            ],
835            &MergeStrategy::KeepFirst,
836        );
837        let ids: Vec<&str> = result.iter().map(|e| e.id.0.as_str()).collect();
838        insta::assert_debug_snapshot!(ids);
839    }
840
841    // --- Property tests ---
842
843    mod prop {
844        use super::*;
845        use proptest::prelude::*;
846        use std::collections::HashSet;
847
848        proptest! {
849            #[test]
850            fn merge_output_length_le_input(n1 in 0usize..5, n2 in 0usize..5) {
851                let t = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
852                let src1: Vec<_> = (0..n1).map(|i| make_event(&format!("a{i}"), t)).collect();
853                let src2: Vec<_> = (0..n2).map(|i| make_event(&format!("b{i}"), t)).collect();
854                let result = merge_events(vec![src1, src2], &MergeStrategy::KeepFirst);
855                prop_assert!(result.len() <= n1 + n2);
856            }
857
858            #[test]
859            fn merge_output_always_sorted(n in 1usize..8) {
860                let events: Vec<_> = (0..n).map(|i| {
861                    let day = (i % 28) as u32 + 1;
862                    make_event(&format!("e{i}"), Utc.with_ymd_and_hms(2025, 1, day, 0, 0, 0).unwrap())
863                }).collect();
864                let result = merge_events(vec![events], &MergeStrategy::default());
865                for w in result.windows(2) {
866                    prop_assert!(w[0].occurred_at <= w[1].occurred_at);
867                }
868            }
869
870            #[test]
871            fn merge_is_idempotent(n in 1usize..5) {
872                let t = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
873                let events: Vec<_> = (0..n).map(|i| make_event(&format!("e{i}"), t)).collect();
874                let first = merge_events(vec![events.clone()], &MergeStrategy::KeepFirst);
875                let second = merge_events(vec![first.clone()], &MergeStrategy::KeepFirst);
876                prop_assert_eq!(first.len(), second.len());
877                for (a, b) in first.iter().zip(second.iter()) {
878                    prop_assert_eq!(&a.id, &b.id);
879                }
880            }
881
882            #[test]
883            fn all_unique_ids_preserved(n1 in 0usize..4, n2 in 0usize..4) {
884                let t = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
885                let src1: Vec<_> = (0..n1).map(|i| make_event(&format!("s1_{i}"), t)).collect();
886                let src2: Vec<_> = (0..n2).map(|i| make_event(&format!("s2_{i}"), t)).collect();
887                let mut all_ids: HashSet<_> = src1.iter().map(|e| e.id.clone()).collect();
888                all_ids.extend(src2.iter().map(|e| e.id.clone()));
889                let result = merge_events(vec![src1, src2], &MergeStrategy::KeepFirst);
890                let result_ids: HashSet<_> = result.iter().map(|e| e.id.clone()).collect();
891                prop_assert_eq!(all_ids, result_ids);
892            }
893        }
894    }
895}