1use anyhow::{Result, anyhow};
8use chrono::Utc;
9use shiplog_ids::EventId;
10use shiplog_ids::RunId;
11use shiplog_ports::IngestOutput;
12use shiplog_schema::coverage::{Completeness, CoverageManifest, CoverageSlice};
13use shiplog_schema::event::EventEnvelope;
14use std::collections::HashMap;
15
16#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
18pub enum ConflictResolution {
19 PreferFirst,
21 #[default]
23 PreferMostRecent,
24 PreferMostComplete,
26}
27
28#[derive(Clone, Debug, Default)]
30pub enum MergeStrategy {
31 KeepFirst,
33 #[default]
35 KeepLast,
36 KeepMostComplete,
38}
39
40impl From<ConflictResolution> for MergeStrategy {
41 fn from(value: ConflictResolution) -> Self {
42 match value {
43 ConflictResolution::PreferFirst => Self::KeepFirst,
44 ConflictResolution::PreferMostRecent => Self::KeepLast,
45 ConflictResolution::PreferMostComplete => Self::KeepMostComplete,
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
52pub struct MergeReport {
53 pub source_count: usize,
54 pub input_event_count: usize,
55 pub output_event_count: usize,
56 pub conflict_count: usize,
57 pub skipped_events: usize,
58 pub warning_count: usize,
59}
60
61#[derive(Debug, Clone)]
63pub struct MergeResult {
64 pub ingest_output: IngestOutput,
65 pub report: MergeReport,
66}
67
68pub fn merge_events(
73 sources: Vec<Vec<EventEnvelope>>,
74 strategy: &MergeStrategy,
75) -> Vec<EventEnvelope> {
76 let mut events_by_id: HashMap<EventId, EventEnvelope> = HashMap::new();
77
78 for source in sources {
79 for event in source {
80 match events_by_id.get(&event.id) {
81 Some(existing) => {
82 let should_replace = match strategy {
83 MergeStrategy::KeepFirst => false,
84 MergeStrategy::KeepLast => event.occurred_at > existing.occurred_at,
85 MergeStrategy::KeepMostComplete => {
86 completeness_score(&event) > completeness_score(existing)
87 }
88 };
89 if should_replace {
90 events_by_id.insert(event.id.clone(), event);
91 }
92 }
93 None => {
94 events_by_id.insert(event.id.clone(), event);
95 }
96 }
97 }
98 }
99
100 let mut result: Vec<EventEnvelope> = events_by_id.into_values().collect();
101 result.sort_by(|a, b| {
102 a.occurred_at
103 .cmp(&b.occurred_at)
104 .then_with(|| a.id.0.cmp(&b.id.0))
105 });
106 result
107}
108
109pub fn merge_two(
111 left: &[EventEnvelope],
112 right: &[EventEnvelope],
113 strategy: &MergeStrategy,
114) -> Vec<EventEnvelope> {
115 merge_events(vec![left.to_vec(), right.to_vec()], strategy)
116}
117
118pub fn merge_ingest_outputs(
120 ingest_outputs: &[IngestOutput],
121 resolution: ConflictResolution,
122) -> Result<MergeResult> {
123 if ingest_outputs.is_empty() {
124 return Err(anyhow!("No ingest outputs to merge"));
125 }
126
127 let base_coverage = &ingest_outputs[0].coverage;
128 let mut event_groups = Vec::with_capacity(ingest_outputs.len());
129 let mut all_sources = Vec::new();
130 let mut all_warnings = Vec::new();
131 let mut all_slices: Vec<CoverageSlice> = Vec::new();
132 let mut input_event_count = 0usize;
133
134 let mut all_freshness = Vec::new();
135 for ingest in ingest_outputs {
136 input_event_count += ingest.events.len();
137 event_groups.push(ingest.events.clone());
138 all_sources.extend(ingest.coverage.sources.clone());
139 all_warnings.extend(ingest.coverage.warnings.clone());
140 all_slices.extend(ingest.coverage.slices.clone());
141 all_freshness.extend(ingest.freshness.clone());
142 }
143
144 let merged_events = merge_events(event_groups, &resolution.into());
145 let mut coverage = CoverageManifest {
146 run_id: RunId::now("merge"),
147 generated_at: Utc::now(),
148 user: base_coverage.user.clone(),
149 window: base_coverage.window.clone(),
150 mode: "merged".to_string(),
151 sources: {
152 all_sources.sort();
153 all_sources.dedup();
154 all_sources
155 },
156 slices: all_slices,
157 warnings: {
158 if input_event_count > merged_events.len() {
159 let conflicts = input_event_count - merged_events.len();
160 all_warnings.push(format!(
161 "Resolved {} conflict(s) during merge using {:?} strategy",
162 conflicts, resolution,
163 ));
164 }
165 all_warnings
166 },
167 completeness: if ingest_outputs
168 .iter()
169 .any(|o| o.coverage.completeness == Completeness::Partial)
170 {
171 Completeness::Partial
172 } else {
173 Completeness::Complete
174 },
175 };
176
177 let conflict_count = input_event_count.saturating_sub(merged_events.len());
178 coverage.slices.sort_by_key(|slice| slice.window.since);
179
180 let report = MergeReport {
181 source_count: coverage.sources.len(),
182 input_event_count,
183 output_event_count: merged_events.len(),
184 conflict_count,
185 skipped_events: 0,
186 warning_count: coverage.warnings.len(),
187 };
188
189 Ok(MergeResult {
190 ingest_output: IngestOutput {
191 events: merged_events,
192 coverage,
193 freshness: all_freshness,
194 },
195 report,
196 })
197}
198
199pub fn merge_ingest_outputs_legacy(
204 ingest_outputs: &[IngestOutput],
205 resolution: ConflictResolution,
206) -> Result<IngestOutput> {
207 use std::collections::HashMap;
208
209 if ingest_outputs.is_empty() {
210 return Err(anyhow!("No ingest outputs to merge"));
211 }
212
213 let mut event_map: HashMap<String, Vec<EventEnvelope>> = HashMap::new();
214 let mut all_sources: Vec<String> = Vec::new();
215 let mut all_warnings: Vec<String> = Vec::new();
216 let mut all_slices: Vec<shiplog_schema::coverage::CoverageSlice> = Vec::new();
217 let mut all_freshness: Vec<shiplog_schema::freshness::SourceFreshness> = Vec::new();
218
219 let base_output = &ingest_outputs[0];
220 let window = base_output.coverage.window.clone();
221 let user = base_output.coverage.user.clone();
222
223 for ingest in ingest_outputs {
224 for event in &ingest.events {
225 let id = event.id.to_string();
226 event_map.entry(id).or_default().push(event.clone());
227 }
228
229 all_sources.extend(ingest.coverage.sources.clone());
230 all_warnings.extend(ingest.coverage.warnings.clone());
231 all_slices.extend(ingest.coverage.slices.clone());
232 all_freshness.extend(ingest.freshness.clone());
233 }
234
235 let mut merged_events: Vec<EventEnvelope> = Vec::new();
236 let mut conflict_count = 0usize;
237
238 for (_id, events) in event_map {
239 if events.len() == 1 {
240 merged_events.push(events[0].clone());
241 } else {
242 conflict_count += 1;
243 merged_events.push(resolve_conflict_legacy(&events, resolution));
244 }
245 }
246
247 merged_events.sort_by(|a, b| {
248 a.occurred_at
249 .cmp(&b.occurred_at)
250 .then_with(|| a.id.0.cmp(&b.id.0))
251 });
252 all_sources.sort();
253 all_sources.dedup();
254
255 let completeness = if ingest_outputs
256 .iter()
257 .any(|o| o.coverage.completeness == shiplog_schema::coverage::Completeness::Partial)
258 {
259 shiplog_schema::coverage::Completeness::Partial
260 } else {
261 shiplog_schema::coverage::Completeness::Complete
262 };
263
264 if conflict_count > 0 {
265 all_warnings.push(format!(
266 "Resolved {} conflict(s) during merge using {:?} strategy",
267 conflict_count, resolution,
268 ));
269 }
270
271 let coverage = shiplog_schema::coverage::CoverageManifest {
272 run_id: RunId::now("merge"),
273 generated_at: chrono::Utc::now(),
274 user,
275 window,
276 mode: "merged".to_string(),
277 sources: all_sources,
278 slices: all_slices,
279 warnings: all_warnings,
280 completeness,
281 };
282
283 Ok(IngestOutput {
284 events: merged_events,
285 coverage,
286 freshness: all_freshness,
287 })
288}
289
290fn resolve_conflict_legacy(
291 events: &[EventEnvelope],
292 resolution: ConflictResolution,
293) -> EventEnvelope {
294 match resolution {
295 ConflictResolution::PreferFirst => events[0].clone(),
296 ConflictResolution::PreferMostRecent => events
297 .iter()
298 .max_by_key(|e| e.occurred_at)
299 .cloned()
300 .unwrap(),
301 ConflictResolution::PreferMostComplete => events
302 .iter()
303 .max_by_key(|e| completeness_score_legacy(e))
304 .cloned()
305 .unwrap(),
306 }
307}
308
309fn completeness_score_legacy(event: &EventEnvelope) -> usize {
310 let mut score = 0;
311
312 if !event.actor.login.is_empty() {
314 score += 1;
315 }
316 if event.actor.id.is_some() {
317 score += 1;
318 }
319 if !event.repo.full_name.is_empty() {
320 score += 1;
321 }
322 if event.repo.html_url.is_some() {
323 score += 1;
324 }
325 if !event.tags.is_empty() {
326 score += 1;
327 }
328 if !event.links.is_empty() {
329 score += 1;
330 }
331 if event.source.url.is_some() {
332 score += 1;
333 }
334 if event.source.opaque_id.is_some() {
335 score += 1;
336 }
337
338 match &event.payload {
340 shiplog_schema::event::EventPayload::PullRequest(pr) => {
341 if pr.additions.is_some() {
342 score += 1;
343 }
344 if pr.deletions.is_some() {
345 score += 1;
346 }
347 if pr.changed_files.is_some() {
348 score += 1;
349 }
350 if pr.merged_at.is_some() {
351 score += 1;
352 }
353 }
354 shiplog_schema::event::EventPayload::Manual(manual) => {
355 if manual.description.is_some() {
356 score += 1;
357 }
358 if manual.started_at.is_some() {
359 score += 1;
360 }
361 if manual.ended_at.is_some() {
362 score += 1;
363 }
364 if manual.impact.is_some() {
365 score += 1;
366 }
367 }
368 _ => {}
369 }
370
371 score
372}
373
374fn completeness_score(event: &EventEnvelope) -> u32 {
376 let mut score = 0;
377
378 match &event.payload {
380 shiplog_schema::event::EventPayload::PullRequest(pr) => {
381 score += 10;
382 if pr.additions.is_some() {
383 score += 1;
384 }
385 if pr.deletions.is_some() {
386 score += 1;
387 }
388 if pr.changed_files.is_some() {
389 score += 1;
390 }
391 if !pr.touched_paths_hint.is_empty() {
392 score += 1;
393 }
394 }
395 shiplog_schema::event::EventPayload::Review(r) => {
396 score += 8;
397 if !r.pull_title.is_empty() {
398 score += 1;
399 }
400 }
401 shiplog_schema::event::EventPayload::Manual(m) => {
402 score += 5;
403 if m.description.is_some() {
404 score += 2;
405 }
406 if m.impact.is_some() {
407 score += 2;
408 }
409 }
410 }
411
412 if event.source.url.is_some() {
414 score += 1;
415 }
416 if event.source.opaque_id.is_some() {
417 score += 1;
418 }
419
420 if !event.links.is_empty() {
422 score += 2;
423 }
424
425 if !event.tags.is_empty() {
427 score += 1;
428 }
429
430 score
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436 use chrono::{NaiveDate, TimeZone, Utc};
437 use shiplog_ids::EventId;
438 use shiplog_schema::coverage::{CoverageManifest, CoverageSlice, TimeWindow};
439 use shiplog_schema::event::{
440 Actor, EventKind, EventPayload, ManualEvent, ManualEventType, RepoRef, RepoVisibility,
441 SourceRef, SourceSystem,
442 };
443 fn make_event(id: &str, occurred_at: chrono::DateTime<chrono::Utc>) -> EventEnvelope {
444 EventEnvelope {
445 id: EventId::from_parts([id]),
446 kind: EventKind::Manual,
447 occurred_at,
448 actor: Actor {
449 login: "testuser".to_string(),
450 id: Some(123),
451 },
452 repo: RepoRef {
453 full_name: "owner/test".to_string(),
454 html_url: Some("https://github.com/owner/test".to_string()),
455 visibility: RepoVisibility::Public,
456 },
457 payload: EventPayload::Manual(ManualEvent {
458 event_type: ManualEventType::Note,
459 title: "Test event".to_string(),
460 description: None,
461 started_at: None,
462 ended_at: None,
463 impact: None,
464 }),
465 tags: vec![],
466 links: vec![],
467 source: SourceRef {
468 system: SourceSystem::Manual,
469 url: None,
470 opaque_id: None,
471 },
472 }
473 }
474
475 fn make_event_with_tags(
476 id: &str,
477 occurred_at: chrono::DateTime<chrono::Utc>,
478 tags: Vec<String>,
479 ) -> EventEnvelope {
480 let mut e = make_event(id, occurred_at);
481 e.tags = tags;
482 e
483 }
484
485 fn coverage(
486 w: usize,
487 completeness: Completeness,
488 source: &str,
489 warning: &str,
490 ) -> CoverageManifest {
491 CoverageManifest {
492 run_id: RunId::now("test"),
493 generated_at: Utc.timestamp_nanos(1),
494 user: "tester".to_string(),
495 window: TimeWindow {
496 since: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
497 until: NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
498 },
499 mode: "merged".to_string(),
500 sources: vec![source.to_string()],
501 slices: vec![CoverageSlice {
502 window: TimeWindow {
503 since: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
504 until: NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
505 },
506 query: "q".to_string(),
507 total_count: w as u64,
508 fetched: w as u64,
509 incomplete_results: None,
510 notes: vec![],
511 }],
512 warnings: vec![warning.to_string()],
513 completeness,
514 }
515 }
516
517 #[test]
518 fn merge_empty_sources() {
519 let result = merge_events(vec![], &MergeStrategy::default());
520 assert!(result.is_empty());
521 }
522
523 #[test]
524 fn merge_single_source() {
525 let events = vec![
526 make_event("1", Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap()),
527 make_event("2", Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap()),
528 ];
529 let result = merge_events(vec![events], &MergeStrategy::default());
530 assert_eq!(result.len(), 2);
531 }
532
533 #[test]
534 fn merge_deduplicates_by_id() {
535 let event1 = make_event("1", Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap());
536 let event2 = make_event("1", Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap());
537
538 let result = merge_events(
539 vec![vec![event1.clone()], vec![event2.clone()]],
540 &MergeStrategy::KeepLast,
541 );
542 assert_eq!(result.len(), 1);
543 assert_eq!(result[0].occurred_at, event2.occurred_at);
544 }
545
546 #[test]
547 fn merge_keeps_first_strategy() {
548 let event1 = make_event("1", Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap());
549 let event2 = make_event("1", Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap());
550
551 let result = merge_events(
552 vec![vec![event1.clone()], vec![event2]],
553 &MergeStrategy::KeepFirst,
554 );
555 assert_eq!(result.len(), 1);
556 assert_eq!(result[0].occurred_at, event1.occurred_at);
557 }
558
559 #[test]
560 fn merge_keeps_last_strategy() {
561 let event1 = make_event("1", Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap());
562 let event2 = make_event("1", Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap());
563
564 let result = merge_events(
565 vec![vec![event1], vec![event2.clone()]],
566 &MergeStrategy::KeepLast,
567 );
568 assert_eq!(result.len(), 1);
569 assert_eq!(result[0].occurred_at, event2.occurred_at);
570 }
571
572 #[test]
573 fn merge_two_helper() {
574 let left = vec![make_event(
575 "1",
576 Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(),
577 )];
578 let right = vec![make_event(
579 "2",
580 Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap(),
581 )];
582
583 let result = merge_two(&left, &right, &MergeStrategy::default());
584 assert_eq!(result.len(), 2);
585 }
586
587 #[test]
588 fn merge_result_is_sorted() {
589 let events = vec![
590 make_event("a", Utc.with_ymd_and_hms(2025, 1, 3, 0, 0, 0).unwrap()),
591 make_event("b", Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap()),
592 make_event("c", Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap()),
593 ];
594
595 let result = merge_events(vec![events], &MergeStrategy::default());
596
597 assert_eq!(result.len(), 3);
599 assert!(result[0].occurred_at <= result[1].occurred_at);
600 assert!(result[1].occurred_at <= result[2].occurred_at);
601 }
602
603 #[test]
604 fn merge_ingest_outputs_unifies_coverage_and_events() {
605 let ingest_a = IngestOutput {
606 events: vec![
607 make_event("a", Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap()),
608 make_event("b", Utc.with_ymd_and_hms(2025, 1, 1, 1, 0, 0).unwrap()),
609 ],
610 coverage: coverage(2, Completeness::Partial, "github", "a.warning"),
611 freshness: Vec::new(),
612 };
613 let ingest_b = IngestOutput {
614 events: vec![
615 make_event("b", Utc.with_ymd_and_hms(2025, 1, 1, 2, 0, 0).unwrap()),
616 make_event("c", Utc.with_ymd_and_hms(2025, 1, 1, 3, 0, 0).unwrap()),
617 ],
618 coverage: coverage(2, Completeness::Complete, "local_git", "b.warning"),
619 freshness: Vec::new(),
620 };
621
622 let merged =
623 merge_ingest_outputs(&[ingest_a, ingest_b], ConflictResolution::PreferMostRecent)
624 .unwrap();
625
626 assert_eq!(merged.ingest_output.events.len(), 3);
627 assert_eq!(merged.report.conflict_count, 1);
628 assert_eq!(merged.ingest_output.coverage.sources.len(), 2);
629 assert_eq!(merged.ingest_output.coverage.warnings.len(), 3);
630 assert_eq!(
631 merged.ingest_output.coverage.completeness,
632 Completeness::Partial
633 );
634 assert_eq!(merged.ingest_output.coverage.mode, "merged");
635 }
636
637 #[test]
638 fn merge_ingest_outputs_rejects_empty_input() {
639 let err = merge_ingest_outputs(&[], ConflictResolution::PreferMostRecent)
640 .expect_err("expected empty input error");
641 assert!(
642 err.to_string().contains("No ingest outputs to merge"),
643 "{err}"
644 );
645 }
646
647 #[test]
650 fn merge_multiple_sources_no_overlap() {
651 let t1 = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
652 let t2 = Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap();
653 let t3 = Utc.with_ymd_and_hms(2025, 1, 3, 0, 0, 0).unwrap();
654 let result = merge_events(
655 vec![
656 vec![make_event("a", t1)],
657 vec![make_event("b", t2)],
658 vec![make_event("c", t3)],
659 ],
660 &MergeStrategy::default(),
661 );
662 assert_eq!(result.len(), 3);
663 assert!(result[0].occurred_at <= result[1].occurred_at);
664 assert!(result[1].occurred_at <= result[2].occurred_at);
665 }
666
667 #[test]
668 fn merge_all_duplicates() {
669 let t = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
670 let e = make_event("same", t);
671 let result = merge_events(
672 vec![vec![e.clone()], vec![e.clone()], vec![e.clone()]],
673 &MergeStrategy::KeepFirst,
674 );
675 assert_eq!(result.len(), 1);
676 }
677
678 #[test]
679 fn merge_keeps_most_complete_strategy() {
680 let t = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
681 let sparse = make_event("x", t);
682 let rich = make_event_with_tags("x", t, vec!["tag1".into(), "tag2".into()]);
683 let result = merge_events(
684 vec![vec![sparse], vec![rich.clone()]],
685 &MergeStrategy::KeepMostComplete,
686 );
687 assert_eq!(result.len(), 1);
688 assert_eq!(result[0].tags, rich.tags);
689 }
690
691 #[test]
692 fn merge_preserves_order_same_timestamp() {
693 let t = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
694 let events = vec![make_event("z", t), make_event("a", t), make_event("m", t)];
695 let result = merge_events(vec![events], &MergeStrategy::default());
696 assert_eq!(result.len(), 3);
697 for w in result.windows(2) {
699 assert!(w[0].occurred_at <= w[1].occurred_at);
700 if w[0].occurred_at == w[1].occurred_at {
701 assert!(w[0].id.0 <= w[1].id.0);
702 }
703 }
704 }
705
706 #[test]
707 fn merge_single_event_per_source() {
708 let t1 = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
709 let t2 = Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap();
710 let result = merge_events(
711 vec![vec![make_event("a", t1)], vec![make_event("b", t2)]],
712 &MergeStrategy::default(),
713 );
714 assert_eq!(result.len(), 2);
715 }
716
717 #[test]
718 fn merge_two_empty_lists() {
719 let result = merge_two(&[], &[], &MergeStrategy::default());
720 assert!(result.is_empty());
721 }
722
723 #[test]
724 fn merge_two_one_empty() {
725 let t = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
726 let events = vec![make_event("a", t)];
727 let result = merge_two(&events, &[], &MergeStrategy::default());
728 assert_eq!(result.len(), 1);
729 }
730
731 #[test]
732 fn merge_ingest_outputs_single_source() {
733 let ingest = IngestOutput {
734 events: vec![make_event(
735 "a",
736 Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(),
737 )],
738 coverage: coverage(1, Completeness::Complete, "github", ""),
739 freshness: Vec::new(),
740 };
741 let merged = merge_ingest_outputs(&[ingest], ConflictResolution::PreferFirst).unwrap();
742 assert_eq!(merged.ingest_output.events.len(), 1);
743 assert_eq!(merged.report.conflict_count, 0);
744 }
745
746 #[test]
747 fn merge_ingest_outputs_all_complete() {
748 let ingest_a = IngestOutput {
749 events: vec![make_event(
750 "a",
751 Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(),
752 )],
753 coverage: coverage(1, Completeness::Complete, "github", ""),
754 freshness: Vec::new(),
755 };
756 let ingest_b = IngestOutput {
757 events: vec![make_event(
758 "b",
759 Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap(),
760 )],
761 coverage: coverage(1, Completeness::Complete, "local_git", ""),
762 freshness: Vec::new(),
763 };
764 let merged =
765 merge_ingest_outputs(&[ingest_a, ingest_b], ConflictResolution::PreferFirst).unwrap();
766 assert_eq!(
767 merged.ingest_output.coverage.completeness,
768 Completeness::Complete
769 );
770 }
771
772 #[test]
773 fn merge_legacy_rejects_empty_input() {
774 let err = merge_ingest_outputs_legacy(&[], ConflictResolution::PreferFirst)
775 .expect_err("expected error");
776 assert!(err.to_string().contains("No ingest outputs to merge"));
777 }
778
779 #[test]
780 fn merge_legacy_deduplicates() {
781 let t = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
782 let ingest = IngestOutput {
783 events: vec![make_event("a", t)],
784 coverage: coverage(1, Completeness::Complete, "github", ""),
785 freshness: Vec::new(),
786 };
787 let merged =
788 merge_ingest_outputs_legacy(&[ingest.clone(), ingest], ConflictResolution::PreferFirst)
789 .unwrap();
790 assert_eq!(merged.events.len(), 1);
791 }
792
793 #[test]
794 fn conflict_resolution_to_merge_strategy_mapping() {
795 let s: MergeStrategy = ConflictResolution::PreferFirst.into();
796 assert!(matches!(s, MergeStrategy::KeepFirst));
797 let s: MergeStrategy = ConflictResolution::PreferMostRecent.into();
798 assert!(matches!(s, MergeStrategy::KeepLast));
799 let s: MergeStrategy = ConflictResolution::PreferMostComplete.into();
800 assert!(matches!(s, MergeStrategy::KeepMostComplete));
801 }
802
803 #[test]
806 fn snapshot_merge_report() {
807 let t1 = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
808 let t2 = Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap();
809 let ingest_a = IngestOutput {
810 events: vec![make_event("a", t1), make_event("shared", t1)],
811 coverage: coverage(2, Completeness::Complete, "github", "warn-a"),
812 freshness: Vec::new(),
813 };
814 let ingest_b = IngestOutput {
815 events: vec![make_event("shared", t2), make_event("b", t2)],
816 coverage: coverage(2, Completeness::Complete, "local_git", "warn-b"),
817 freshness: Vec::new(),
818 };
819 let merged =
820 merge_ingest_outputs(&[ingest_a, ingest_b], ConflictResolution::PreferMostRecent)
821 .unwrap();
822 insta::assert_debug_snapshot!(merged.report);
823 }
824
825 #[test]
826 fn snapshot_merged_event_ids() {
827 let t1 = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
828 let t2 = Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap();
829 let t3 = Utc.with_ymd_and_hms(2025, 1, 3, 0, 0, 0).unwrap();
830 let result = merge_events(
831 vec![
832 vec![make_event("alpha", t1), make_event("beta", t2)],
833 vec![make_event("beta", t2), make_event("gamma", t3)],
834 ],
835 &MergeStrategy::KeepFirst,
836 );
837 let ids: Vec<&str> = result.iter().map(|e| e.id.0.as_str()).collect();
838 insta::assert_debug_snapshot!(ids);
839 }
840
841 mod prop {
844 use super::*;
845 use proptest::prelude::*;
846 use std::collections::HashSet;
847
848 proptest! {
849 #[test]
850 fn merge_output_length_le_input(n1 in 0usize..5, n2 in 0usize..5) {
851 let t = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
852 let src1: Vec<_> = (0..n1).map(|i| make_event(&format!("a{i}"), t)).collect();
853 let src2: Vec<_> = (0..n2).map(|i| make_event(&format!("b{i}"), t)).collect();
854 let result = merge_events(vec![src1, src2], &MergeStrategy::KeepFirst);
855 prop_assert!(result.len() <= n1 + n2);
856 }
857
858 #[test]
859 fn merge_output_always_sorted(n in 1usize..8) {
860 let events: Vec<_> = (0..n).map(|i| {
861 let day = (i % 28) as u32 + 1;
862 make_event(&format!("e{i}"), Utc.with_ymd_and_hms(2025, 1, day, 0, 0, 0).unwrap())
863 }).collect();
864 let result = merge_events(vec![events], &MergeStrategy::default());
865 for w in result.windows(2) {
866 prop_assert!(w[0].occurred_at <= w[1].occurred_at);
867 }
868 }
869
870 #[test]
871 fn merge_is_idempotent(n in 1usize..5) {
872 let t = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
873 let events: Vec<_> = (0..n).map(|i| make_event(&format!("e{i}"), t)).collect();
874 let first = merge_events(vec![events.clone()], &MergeStrategy::KeepFirst);
875 let second = merge_events(vec![first.clone()], &MergeStrategy::KeepFirst);
876 prop_assert_eq!(first.len(), second.len());
877 for (a, b) in first.iter().zip(second.iter()) {
878 prop_assert_eq!(&a.id, &b.id);
879 }
880 }
881
882 #[test]
883 fn all_unique_ids_preserved(n1 in 0usize..4, n2 in 0usize..4) {
884 let t = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
885 let src1: Vec<_> = (0..n1).map(|i| make_event(&format!("s1_{i}"), t)).collect();
886 let src2: Vec<_> = (0..n2).map(|i| make_event(&format!("s2_{i}"), t)).collect();
887 let mut all_ids: HashSet<_> = src1.iter().map(|e| e.id.clone()).collect();
888 all_ids.extend(src2.iter().map(|e| e.id.clone()));
889 let result = merge_events(vec![src1, src2], &MergeStrategy::KeepFirst);
890 let result_ids: HashSet<_> = result.iter().map(|e| e.id.clone()).collect();
891 prop_assert_eq!(all_ids, result_ids);
892 }
893 }
894 }
895}