Skip to main content

parlov_analysis/aggregation/
reducer.rs

1//! Offline log-odds reducer for evidence events.
2//!
3//! Replaces the online order-dependent accumulator with a pure function over the full event set:
4//! group by `(family, polarity)`, sort each group by weight descending, apply a polarity-specific
5//! diminishing-returns schedule, cap the contribution per group, and sum the discounted signed
6//! log-odds. Order-invariant by construction.
7
8use std::cmp::Ordering;
9
10use indexmap::IndexMap;
11
12use crate::existence::families::SignalFamily;
13
14/// Polarity of an evidence event.
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
16pub enum EvidencePolarity {
17    /// Evidence supports the hypothesis (positive log-odds contribution).
18    Positive,
19    /// Evidence opposes the hypothesis (negative log-odds contribution).
20    Contradictory,
21}
22
23/// A single evidence event collected for offline aggregation.
24///
25/// Fields are crate-private; construct events via [`EvidenceEvent::positive`] or
26/// [`EvidenceEvent::contradictory`] so `signed_log_odds`'s sign always agrees with `polarity`.
27#[derive(Debug, Clone)]
28pub struct EvidenceEvent {
29    pub(crate) family: SignalFamily,
30    pub(crate) polarity: EvidencePolarity,
31    pub(crate) technique_id: String,
32    pub(crate) weight: f64,
33    pub(crate) signed_log_odds: f64,
34}
35
36impl EvidenceEvent {
37    /// Positive event; `signed_log_odds` is forced to `log_odds_magnitude.abs()`.
38    #[must_use]
39    pub fn positive(
40        family: SignalFamily,
41        technique_id: impl Into<String>,
42        weight: f64,
43        log_odds_magnitude: f64,
44    ) -> Self {
45        Self {
46            family,
47            polarity: EvidencePolarity::Positive,
48            technique_id: technique_id.into(),
49            weight,
50            signed_log_odds: log_odds_magnitude.abs(),
51        }
52    }
53
54    /// Contradictory event; `signed_log_odds` is forced to `-log_odds_magnitude.abs()`.
55    #[must_use]
56    pub fn contradictory(
57        family: SignalFamily,
58        technique_id: impl Into<String>,
59        weight: f64,
60        log_odds_magnitude: f64,
61    ) -> Self {
62        Self {
63            family,
64            polarity: EvidencePolarity::Contradictory,
65            technique_id: technique_id.into(),
66            weight,
67            signed_log_odds: -log_odds_magnitude.abs(),
68        }
69    }
70}
71
72/// Diminishing-returns schedule for `Positive` evidence.
73pub const POSITIVE_SCHEDULE: &[f64] = &[1.0, 0.5, 0.25, 0.1];
74
75/// Diminishing-returns schedule for `Contradictory` evidence (softer than positive).
76pub const CONTRADICTORY_SCHEDULE: &[f64] = &[1.0, 0.7, 0.5, 0.3, 0.1];
77
78/// Per-(family, polarity) cap on contribution magnitude.
79pub const PER_GROUP_CAP: f64 = 0.75;
80
81/// Reduces a single (family, polarity) group of events to a discounted log-odds contribution.
82///
83/// Sorts events by `weight` descending (with `technique_id` ascending as tiebreaker for
84/// determinism), applies the schedule slot by slot (events beyond the schedule length contribute
85/// 0), and clamps the magnitude to `cap`.
86#[must_use]
87pub fn reduce_family_polarity(events: &[EvidenceEvent], schedule: &[f64], cap: f64) -> f64 {
88    if events.is_empty() || schedule.is_empty() {
89        return 0.0;
90    }
91
92    let mut sorted: Vec<&EvidenceEvent> = events.iter().collect();
93    sorted.sort_by(|a, b| cmp_event_desc(a, b));
94
95    let total: f64 = sorted
96        .iter()
97        .zip(schedule.iter())
98        .map(|(event, multiplier)| event.signed_log_odds * multiplier)
99        .sum();
100
101    clamp_magnitude(total, cap)
102}
103
104/// Total log-odds across all `(family, polarity)` groups.
105///
106/// Equivalent to `reduce_with_attribution(events).total_log_odds` but skips the per-event
107/// contribution allocation.
108#[must_use]
109pub fn reduce_all(events: &[EvidenceEvent]) -> f64 {
110    if events.is_empty() {
111        return 0.0;
112    }
113    let mut total = 0.0;
114    for (group, polarity) in group_indices_by_family_polarity(events) {
115        let schedule = schedule_for(polarity);
116        let unclamped: f64 = sorted_indices(events, &group)
117            .into_iter()
118            .zip(schedule.iter())
119            .map(|(i, m)| events[i].signed_log_odds * m)
120            .sum();
121        total += clamp_magnitude(unclamped, PER_GROUP_CAP);
122    }
123    total
124}
125
126/// Sorts a list of input indices by the events' weight descending, with `technique_id`
127/// ascending as a deterministic tiebreaker.
128fn sorted_indices(events: &[EvidenceEvent], indices: &[usize]) -> Vec<usize> {
129    let mut sorted = indices.to_vec();
130    sorted.sort_by(|&a, &b| cmp_event_desc(&events[a], &events[b]));
131    sorted
132}
133
134/// Result of reducing a full event set, with per-event log-odds attribution.
135#[derive(Debug, Clone)]
136pub struct ReductionResult {
137    /// Total log-odds across all groups.
138    pub total_log_odds: f64,
139    /// Per-event contribution, parallel to the input slice. Indexed by the event's position in
140    /// the input vector. Sums to `total_log_odds` modulo floating-point noise.
141    pub contributions: Vec<f64>,
142}
143
144/// Reduces all events and reports per-event log-odds attribution under the same sort, schedule,
145/// and cap rules as [`reduce_all`]. `contributions[i]` corresponds to `events[i]`. When a group
146/// exceeds [`PER_GROUP_CAP`], every event in that group is scaled by `cap / |unclamped_sum|`
147/// so the group's contributions sum exactly to the signed cap.
148#[must_use]
149pub fn reduce_with_attribution(events: &[EvidenceEvent]) -> ReductionResult {
150    if events.is_empty() {
151        return ReductionResult {
152            total_log_odds: 0.0,
153            contributions: Vec::new(),
154        };
155    }
156
157    let mut contributions = vec![0.0_f64; events.len()];
158    let mut total = 0.0_f64;
159    for (group, polarity) in group_indices_by_family_polarity(events) {
160        let schedule = schedule_for(polarity);
161        let group_total = attribute_group(events, &group, schedule, &mut contributions);
162        total += group_total;
163    }
164    ReductionResult {
165        total_log_odds: total,
166        contributions,
167    }
168}
169
170/// Computes per-event contributions for one `(family, polarity)` group and writes them into
171/// `contributions` at the original input indices. Returns the group's clamped total.
172fn attribute_group(
173    events: &[EvidenceEvent],
174    group_indices: &[usize],
175    schedule: &[f64],
176    contributions: &mut [f64],
177) -> f64 {
178    if group_indices.is_empty() || schedule.is_empty() {
179        return 0.0;
180    }
181
182    let sorted = sorted_indices(events, group_indices);
183    let unclamped: f64 = sorted
184        .iter()
185        .zip(schedule.iter())
186        .map(|(&i, m)| events[i].signed_log_odds * m)
187        .sum();
188    let clamped = clamp_magnitude(unclamped, PER_GROUP_CAP);
189    let scale = if (clamped - unclamped).abs() <= f64::EPSILON || unclamped.abs() <= f64::EPSILON {
190        1.0
191    } else {
192        clamped / unclamped
193    };
194
195    for (slot, &idx) in sorted.iter().enumerate() {
196        let multiplier = schedule.get(slot).copied().unwrap_or(0.0);
197        contributions[idx] = events[idx].signed_log_odds * multiplier * scale;
198    }
199    clamped
200}
201
202/// Groups input indices by `(family, polarity)`, preserving first-occurrence order.
203fn group_indices_by_family_polarity(
204    events: &[EvidenceEvent],
205) -> Vec<(Vec<usize>, EvidencePolarity)> {
206    let mut groups: IndexMap<(SignalFamily, EvidencePolarity), Vec<usize>> = IndexMap::new();
207    for (i, event) in events.iter().enumerate() {
208        groups
209            .entry((event.family, event.polarity))
210            .or_default()
211            .push(i);
212    }
213    groups
214        .into_iter()
215        .map(|((_, polarity), indices)| (indices, polarity))
216        .collect()
217}
218
219/// Returns the schedule corresponding to a polarity.
220fn schedule_for(polarity: EvidencePolarity) -> &'static [f64] {
221    match polarity {
222        EvidencePolarity::Positive => POSITIVE_SCHEDULE,
223        EvidencePolarity::Contradictory => CONTRADICTORY_SCHEDULE,
224    }
225}
226
227/// Sort comparator: weight descending, then `technique_id` ascending. NaN-safe.
228fn cmp_event_desc(a: &EvidenceEvent, b: &EvidenceEvent) -> Ordering {
229    match b.weight.partial_cmp(&a.weight) {
230        Some(Ordering::Equal) | None => a.technique_id.cmp(&b.technique_id),
231        Some(other) => other,
232    }
233}
234
235/// Clamps `value` so its magnitude does not exceed `cap`. Cap is treated as `cap.abs()`.
236fn clamp_magnitude(value: f64, cap: f64) -> f64 {
237    let cap = cap.abs();
238    if value > cap {
239        cap
240    } else if value < -cap {
241        -cap
242    } else {
243        value
244    }
245}
246
247#[cfg(test)]
248#[path = "reducer_tests.rs"]
249mod tests;