1use dw_models::Event;
2
3use crate::sort_by_timestamp;
4
5pub fn flood(events: Vec<Event>, pulsetime: chrono::Duration) -> Vec<Event> {
16 let mut warned_negative_gap_safe = false;
17 let mut warned_negative_gap_unsafe = false;
18 let mut events_sorted = sort_by_timestamp(events);
19 let mut e1_iter = events_sorted.drain(..).peekable();
20 let mut new_events = Vec::new();
21 let mut gap_prev: Option<chrono::Duration> = None;
22 let mut retry_e: Option<Event> = None;
23 while let Some(mut e1) = match retry_e {
24 Some(e) => {
25 retry_e = None;
26 Some(e)
27 }
28 None => e1_iter.next(),
29 } {
30 if let Some(gap) = gap_prev {
31 e1.timestamp = e1.timestamp - (gap / 2);
32 e1.duration = e1.duration + (gap / 2);
33 gap_prev = None;
34 }
35 let e2 = match e1_iter.peek() {
36 Some(e) => e,
37 None => {
38 new_events.push(e1);
39 break;
40 }
41 };
42
43 let gap = e2.timestamp - e1.calculate_endtime();
44
45 if gap < pulsetime {
46 if e1.data == e2.data {
47 if chrono::Duration::seconds(0) > gap && !warned_negative_gap_safe {
48 warn!("Gap was of negative duration ({}s), but could be safely merged. This error will only show once per batch.", gap);
49 warned_negative_gap_safe = true;
50 }
51 let e1_endtime = e1.calculate_endtime();
53 let e2_endtime = e2.calculate_endtime();
54 if e2_endtime > e1_endtime {
55 e1.duration = e2_endtime - e1.timestamp;
56 } else {
57 e1.duration = e1_endtime - e1.timestamp;
58 }
59 e1_iter.next();
61 retry_e = Some(e1);
64 continue;
67 } else {
68 if chrono::Duration::seconds(0) > gap {
69 if !warned_negative_gap_unsafe {
70 warn!("Gap was of negative duration ({}s) and could NOT be safely merged. This error will only show once per batch.", gap);
71 warned_negative_gap_unsafe = true;
72 }
73 }
74 e1.duration = e1.duration + (gap / 2);
76 gap_prev = Some(gap);
78 }
79 }
80 new_events.push(e1);
81 }
82 new_events
83}
84
85#[cfg(test)]
86mod tests {
87 use std::str::FromStr;
88
89 use chrono::DateTime;
90 use chrono::Duration;
91 use serde_json::json;
92
93 use dw_models::Event;
94
95 use super::flood;
96
97 #[test]
98 fn test_flood() {
99 let e1 = Event {
101 id: None,
102 timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
103 duration: Duration::seconds(1),
104 data: json_map! {"test": json!(1)},
105 };
106 let e2 = Event {
107 id: None,
108 timestamp: DateTime::from_str("2000-01-01T00:00:03Z").unwrap(),
109 duration: Duration::seconds(1),
110 data: json_map! {"test": json!(1)},
111 };
112 let e_expected = Event {
113 id: None,
114 timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
115 duration: Duration::seconds(4),
116 data: json_map! {"test": json!(1)},
117 };
118 let res = flood(vec![e1.clone(), e2.clone()], Duration::seconds(5));
119 assert_eq!(1, res.len());
120 assert_eq!(&res[0], &e_expected);
121
122 let e1 = Event {
124 id: None,
125 timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
126 duration: Duration::seconds(1),
127 data: json_map! {"test": json!(1)},
128 };
129 let e2 = Event {
130 id: None,
131 timestamp: DateTime::from_str("2000-01-01T00:00:03Z").unwrap(),
132 duration: Duration::seconds(1),
133 data: json_map! {"test": json!(2)},
134 };
135 let e1_expected = Event {
136 id: None,
137 timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
138 duration: Duration::seconds(2),
139 data: json_map! {"test": json!(1)},
140 };
141 let e2_expected = Event {
142 id: None,
143 timestamp: DateTime::from_str("2000-01-01T00:00:02Z").unwrap(),
144 duration: Duration::seconds(2),
145 data: json_map! {"test": json!(2)},
146 };
147 let res = flood(vec![e1.clone(), e2.clone()], Duration::seconds(5));
148 assert_eq!(2, res.len());
149 assert_eq!(&res[0], &e1_expected);
150 assert_eq!(&res[1], &e2_expected);
151 }
152
153 #[test]
154 fn test_flood_same_timestamp() {
155 let e1 = Event {
160 id: None,
161 timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
162 duration: Duration::seconds(1),
163 data: json_map! {"status": "afk"},
164 };
165 let e2 = Event {
166 id: None,
167 timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
168 duration: Duration::seconds(5),
169 data: json_map! {"status": "not-afk"},
170 };
171 let e3 = Event {
172 id: None,
173 timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
174 duration: Duration::seconds(1),
175 data: json_map! {"status": "not-afk"},
176 };
177 let e4 = Event {
178 id: None,
179 timestamp: DateTime::from_str("2000-01-01T00:00:06Z").unwrap(),
180 duration: Duration::seconds(1),
181 data: json_map! {"status": "afk"},
182 };
183 let res = flood(
184 vec![e1.clone(), e2.clone(), e3.clone(), e4.clone()],
185 Duration::seconds(5),
186 );
187 assert_eq!(3, res.len());
188 assert_eq!(&res[0], &e1);
189 assert_eq!(&res[1], &e2);
190 assert_eq!(&res[2], &e4);
191
192 let e1 = Event {
198 id: None,
199 timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
200 duration: Duration::seconds(1),
201 data: json_map! {"status": "afk"},
202 };
203 let e2 = Event {
204 id: None,
205 timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
206 duration: Duration::seconds(5),
207 data: json_map! {"status": "not-afk"},
208 };
209 let e3 = Event {
210 id: None,
211 timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
212 duration: Duration::seconds(1),
213 data: json_map! {"status": "not-afk"},
214 };
215 let e4 = Event {
216 id: None,
217 timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
218 duration: Duration::seconds(10),
219 data: json_map! {"status": "not-afk"},
220 };
221 let e5 = Event {
222 id: None,
223 timestamp: DateTime::from_str("2000-01-01T00:00:11Z").unwrap(),
224 duration: Duration::seconds(1),
225 data: json_map! {"status": "afk"},
226 };
227 let res = flood(
228 vec![e1.clone(), e2.clone(), e3.clone(), e4.clone(), e5.clone()],
229 Duration::seconds(5),
230 );
231 assert_eq!(3, res.len());
232 assert_eq!(&res[0], &e1);
233 assert_eq!(&res[1], &e4);
234 assert_eq!(&res[2], &e5);
235 }
236}