libdd_profiling/internal/
upscaling.rs

1// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use super::*;
5use crate::api::UpscalingInfo;
6use anyhow::Context;
7
8#[derive(Debug)]
9pub struct UpscalingRule {
10    upscaling_info: UpscalingInfo,
11    values_offset: Vec<usize>,
12}
13
14impl UpscalingRule {
15    pub fn compute_scale(&self, values: &[i64]) -> f64 {
16        match self.upscaling_info {
17            UpscalingInfo::Poisson {
18                sum_value_offset,
19                count_value_offset,
20                sampling_distance,
21            } => {
22                // This should not happen, but if it happens,
23                // do not upscale
24                if values[sum_value_offset] == 0 || values[count_value_offset] == 0 {
25                    return 1_f64;
26                }
27
28                let avg = values[sum_value_offset] as f64 / values[count_value_offset] as f64;
29                1_f64 / (1_f64 - (-avg / sampling_distance as f64).exp())
30            }
31            UpscalingInfo::PoissonNonSampleTypeCount {
32                sum_value_offset,
33                count_value,
34                sampling_distance,
35            } => {
36                // This should not happen, but if it happens,
37                // do not upscale
38                if values[sum_value_offset] == 0 || count_value == 0 {
39                    return 1_f64;
40                }
41
42                let avg = values[sum_value_offset] as f64 / count_value as f64;
43                1_f64 / (1_f64 - (-avg / sampling_distance as f64).exp())
44            }
45            UpscalingInfo::Proportional { scale } => scale,
46        }
47    }
48
49    pub fn new(values_offset: Vec<usize>, upscaling_info: UpscalingInfo) -> Self {
50        Self {
51            values_offset,
52            upscaling_info,
53        }
54    }
55}
56
57#[derive(Default)]
58pub struct UpscalingRules {
59    rules: FxIndexMap<(StringId, StringId), Vec<UpscalingRule>>,
60    // this is just an optimization in the case where we check collisions (when adding
61    // a by-value rule) against by-label rules
62    // 32 should be enough for the size of the bitmap
63    offset_modified_by_bylabel_rule: bitmaps::Bitmap<32>,
64}
65
66impl UpscalingRules {
67    pub fn add(
68        &mut self,
69        values_offset: &[usize],
70        label_name: (&str, StringId),
71        label_value: (&str, StringId),
72        upscaling_info: UpscalingInfo,
73        max_offset: usize,
74    ) -> anyhow::Result<()> {
75        anyhow::ensure!(
76            values_offset.iter().all(|x| *x < max_offset),
77            "Invalid offset. Highest expected offset: {max_offset}",
78        );
79
80        let mut new_values_offset = values_offset.to_vec();
81        new_values_offset.sort_unstable();
82
83        self.check_collisions(&new_values_offset, label_name, label_value, &upscaling_info)?;
84        upscaling_info.check_validity(max_offset)?;
85        let rule: UpscalingRule = UpscalingRule::new(new_values_offset, upscaling_info);
86
87        let label_name_id = label_name.1;
88        let label_value_id = label_value.1;
89        if !label_name_id.is_zero() || !label_value_id.is_zero() {
90            rule.values_offset.iter().for_each(|offset| {
91                self.offset_modified_by_bylabel_rule.set(*offset, true);
92            })
93        }
94        match self.rules.get_index_of(&(label_name_id, label_value_id)) {
95            None => {
96                let rules = vec![rule];
97                self.rules.insert((label_name_id, label_value_id), rules);
98            }
99            Some(index) => {
100                let (_, rules) = self.rules.get_index_mut(index).with_context(|| {
101                    format!("Expected upscaling rules to exist for index {index}")
102                })?;
103                rules.push(rule);
104            }
105        };
106        Ok(())
107    }
108
109    fn check_collisions(
110        &self,
111        values_offset: &[usize],
112        label_name: (&str, StringId),
113        label_value: (&str, StringId),
114        upscaling_info: &UpscalingInfo,
115    ) -> anyhow::Result<()> {
116        // Check for duplicates
117        fn is_overlapping(v1: &[usize], v2: &[usize]) -> bool {
118            v1.iter().any(|x| v2.contains(x))
119        }
120        let (label_name_str, label_name_id) = label_name;
121        let (label_value_str, label_value_id) = label_value;
122
123        let colliding_rule = match self.rules.get(&(label_name_id, label_value_id)) {
124            Some(rules) => rules
125                .iter()
126                .find(|rule| is_overlapping(&rule.values_offset, values_offset)),
127            None => None,
128        };
129
130        anyhow::ensure!(
131            colliding_rule.is_none(),
132            "There are duplicated by-label rules for the same label name: {label_name_str} with at least one value offset in common.\n\
133            Existing rule {colliding_rule:?}\n\
134            New rule {label_name_str} {label_value_str} {values_offset:?} {upscaling_info:?}"
135        );
136
137        // if we are adding a by-value rule, we need to check against
138        // all by-label rules for collisions
139        if label_name.1.is_zero() && label_value.1.is_zero() {
140            let collision_offset = values_offset
141                .iter()
142                .find(|offset| self.offset_modified_by_bylabel_rule.get(**offset));
143
144            anyhow::ensure!(
145                collision_offset.is_none(),
146                "The by-value rule is colliding with at least one by-label rule at offset {collision_offset:?}\n\
147                by-value rule values offset(s) {values_offset:?}",
148            )
149        } else if let Some(rules) = self.rules.get(&(StringId::ZERO, StringId::ZERO)) {
150            let collide_with_byvalue_rule = rules
151                .iter()
152                .find(|rule| is_overlapping(&rule.values_offset, values_offset));
153            anyhow::ensure!(collide_with_byvalue_rule.is_none(),
154                "The by-label rule (label name {label_name_str}, label value {label_value_str}) is colliding with a by-value rule on values offsets\n\
155                Existing values offset(s) {collide_with_byvalue_rule:?}, new rule values offset(s) {values_offset:?}");
156        }
157        Ok(())
158    }
159
160    pub fn get(&self, k: &(StringId, StringId)) -> Option<&Vec<UpscalingRule>> {
161        self.rules.get(k)
162    }
163
164    pub fn is_empty(&self) -> bool {
165        self.rules.is_empty()
166    }
167
168    pub fn upscale_values(&self, values: &mut [i64], labels: &[Label]) -> anyhow::Result<()> {
169        if !self.is_empty() {
170            // get bylabel rules first (if any)
171            let mut group_of_rules = labels
172                .iter()
173                .filter_map(|label| {
174                    self.get(&(
175                        label.get_key(),
176                        match label.get_value() {
177                            LabelValue::Str(str) => *str,
178                            LabelValue::Num { .. } => StringId::ZERO,
179                        },
180                    ))
181                })
182                .collect::<Vec<&Vec<UpscalingRule>>>();
183
184            // get byvalue rules if any
185            if let Some(byvalue_rules) = self.get(&(StringId::ZERO, StringId::ZERO)) {
186                group_of_rules.push(byvalue_rules);
187            }
188
189            group_of_rules.iter().for_each(|rules| {
190                rules.iter().for_each(|rule| {
191                    let scale = rule.compute_scale(values);
192                    rule.values_offset.iter().for_each(|offset| {
193                        values[*offset] = (values[*offset] as f64 * scale).round() as i64
194                    })
195                })
196            });
197        }
198
199        Ok(())
200    }
201}