1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
use crate::event::Event;
use std::cmp::{max, min};
use chrono::{DateTime, Utc, Duration};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub enum EventAggregationRule {
#[serde(rename = "sum")]
Sum,
#[serde(rename = "avg")]
Avg
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct Campaign {
pub start: DateTime<Utc>,
pub end: DateTime<Utc>,
#[serde(rename = "aggregationPeriodLength")]
pub aggregation_period_length: u32,
#[serde(rename = "numberOfPeriods", default = "Campaign::default_number_of_periods")]
pub number_of_periods: u32,
#[serde(rename = "onlyRecordOnce", default = "Campaign::default_only_record_once")]
pub only_record_once: bool,
#[serde(rename = "eventAggregationRule", default = "Campaign::default_event_aggregation_rule")]
pub event_aggregation_rule: EventAggregationRule,
#[serde(rename = "strengthenAnonymity", default = "Campaign::default_strengthen_anonymity")]
pub strengthen_anonymity: bool,
}
impl Campaign {
pub fn new(start: DateTime<Utc>, end: DateTime<Utc>, aggregation_period_length: u32) -> Campaign
{
Campaign {
start, end, aggregation_period_length,
number_of_periods: Campaign::default_number_of_periods(),
only_record_once: Campaign::default_only_record_once(),
event_aggregation_rule: Campaign::default_event_aggregation_rule(),
strengthen_anonymity: Campaign::default_strengthen_anonymity()}
}
pub fn default_number_of_periods() -> u32 { 1 }
pub fn default_only_record_once() -> bool { false }
pub fn default_event_aggregation_rule() -> EventAggregationRule { EventAggregationRule::Sum }
pub fn default_strengthen_anonymity() -> bool { false }
pub fn aggregation_period(&self) -> Duration {
Duration::days(self.aggregation_period_length as i64)
}
pub fn current_measurement_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
if self.number_of_periods <= 0 {
return None
}
let mut now = Utc::now();
let mut period_end = self.start.clone();
let aggregation_period = self.aggregation_period();
loop {
period_end = period_end + aggregation_period;
if period_end > now {
break
}
}
let period_start = max(period_end - aggregation_period, self.start.clone());
period_end = min(period_end, self.end.clone());
now = Utc::now();
if period_start >= now || period_end <= now {
return None
}
Some((period_start, period_end))
}
pub fn next_total_measurement_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
let current = self.current_measurement_period()?;
let period_start = if self.strengthen_anonymity { current.1 } else { current.0 };
let mut period_end = period_start.clone();
let mut counter = 0;
let aggregation_period = self.aggregation_period();
while counter < self.number_of_periods && period_end + aggregation_period <= self.end {
period_end = period_end + aggregation_period;
counter += 1;
}
if period_start == period_end {
return None
}
Some((period_start, period_end))
}
pub fn apply(&self, value: Option<f64>, event: &mut Event) {
if value == None || self.only_record_once {
return
}
let old_val = event.value.unwrap_or(0.0);
match &self.event_aggregation_rule {
EventAggregationRule::Sum => {
event.value = Some(old_val + value.unwrap());
}
EventAggregationRule::Avg => {
let times = event.times as f64;
event.value = Some((old_val * (times - 1.0) + value.unwrap()) / times);
}
}
}
}