dw_transform/
flood.rs

1use dw_models::Event;
2
3use crate::sort_by_timestamp;
4
5/// Floods event to the nearest neighbouring event if within the specified pulsetime
6///
7/// Also merges events if they have the same data and are within the pulsetime
8///
9/// # Example
10/// ```ignore
11/// pulsetime: 1 second (one space)
12/// input:  [a] [a]  [b][b]    [b][c]
13/// output: [a     ][b    ]    [b][c]
14/// ```
15pub fn flood(events: Vec<Event>, pulsetime: chrono::Duration) -> Vec<Event> {
16    let mut warned_negative_gap_safe = false;
17    let mut warned_negative_gap_unsafe = false;
18    let mut events_sorted = sort_by_timestamp(events);
19    let mut e1_iter = events_sorted.drain(..).peekable();
20    let mut new_events = Vec::new();
21    let mut gap_prev: Option<chrono::Duration> = None;
22    let mut retry_e: Option<Event> = None;
23    while let Some(mut e1) = match retry_e {
24        Some(e) => {
25            retry_e = None;
26            Some(e)
27        }
28        None => e1_iter.next(),
29    } {
30        if let Some(gap) = gap_prev {
31            e1.timestamp = e1.timestamp - (gap / 2);
32            e1.duration = e1.duration + (gap / 2);
33            gap_prev = None;
34        }
35        let e2 = match e1_iter.peek() {
36            Some(e) => e,
37            None => {
38                new_events.push(e1);
39                break;
40            }
41        };
42
43        let gap = e2.timestamp - e1.calculate_endtime();
44
45        if gap < pulsetime {
46            if e1.data == e2.data {
47                if chrono::Duration::seconds(0) > gap && !warned_negative_gap_safe {
48                    warn!("Gap was of negative duration ({}s), but could be safely merged. This error will only show once per batch.", gap);
49                    warned_negative_gap_safe = true;
50                }
51                // Choose the longest event and set the endtime to it
52                let e1_endtime = e1.calculate_endtime();
53                let e2_endtime = e2.calculate_endtime();
54                if e2_endtime > e1_endtime {
55                    e1.duration = e2_endtime - e1.timestamp;
56                } else {
57                    e1.duration = e1_endtime - e1.timestamp;
58                }
59                // Drop next event since they are merged and flooded into e1
60                e1_iter.next();
61                // Retry this event again to give it a change to merge e1
62                // with 'e3'
63                retry_e = Some(e1);
64                // Since we are retrying on this event we don't want to push it
65                // to the new_events vec
66                continue;
67            } else {
68                if chrono::Duration::seconds(0) > gap {
69                    if !warned_negative_gap_unsafe {
70                        warn!("Gap was of negative duration ({}s) and could NOT be safely merged. This error will only show once per batch.", gap);
71                        warned_negative_gap_unsafe = true;
72                    }
73                }
74                // Extend e1 to the middle between e1 and e2
75                e1.duration = e1.duration + (gap / 2);
76                // Make sure next event is pre-extended
77                gap_prev = Some(gap);
78            }
79        }
80        new_events.push(e1);
81    }
82    new_events
83}
84
85#[cfg(test)]
86mod tests {
87    use std::str::FromStr;
88
89    use chrono::DateTime;
90    use chrono::Duration;
91    use serde_json::json;
92
93    use dw_models::Event;
94
95    use super::flood;
96
97    #[test]
98    fn test_flood() {
99        // Test merging of events with the same data
100        let e1 = Event {
101            id: None,
102            timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
103            duration: Duration::seconds(1),
104            data: json_map! {"test": json!(1)},
105        };
106        let e2 = Event {
107            id: None,
108            timestamp: DateTime::from_str("2000-01-01T00:00:03Z").unwrap(),
109            duration: Duration::seconds(1),
110            data: json_map! {"test": json!(1)},
111        };
112        let e_expected = Event {
113            id: None,
114            timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
115            duration: Duration::seconds(4),
116            data: json_map! {"test": json!(1)},
117        };
118        let res = flood(vec![e1.clone(), e2.clone()], Duration::seconds(5));
119        assert_eq!(1, res.len());
120        assert_eq!(&res[0], &e_expected);
121
122        // Test flood gap between two different events which should meet in the middle
123        let e1 = Event {
124            id: None,
125            timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
126            duration: Duration::seconds(1),
127            data: json_map! {"test": json!(1)},
128        };
129        let e2 = Event {
130            id: None,
131            timestamp: DateTime::from_str("2000-01-01T00:00:03Z").unwrap(),
132            duration: Duration::seconds(1),
133            data: json_map! {"test": json!(2)},
134        };
135        let e1_expected = Event {
136            id: None,
137            timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
138            duration: Duration::seconds(2),
139            data: json_map! {"test": json!(1)},
140        };
141        let e2_expected = Event {
142            id: None,
143            timestamp: DateTime::from_str("2000-01-01T00:00:02Z").unwrap(),
144            duration: Duration::seconds(2),
145            data: json_map! {"test": json!(2)},
146        };
147        let res = flood(vec![e1.clone(), e2.clone()], Duration::seconds(5));
148        assert_eq!(2, res.len());
149        assert_eq!(&res[0], &e1_expected);
150        assert_eq!(&res[1], &e2_expected);
151    }
152
153    #[test]
154    fn test_flood_same_timestamp() {
155        // e1, stay same
156        // e2, base merge (longest duration, this should be the duration selected)
157        // e3, merge with e2
158        // e4, stay same
159        let e1 = Event {
160            id: None,
161            timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
162            duration: Duration::seconds(1),
163            data: json_map! {"status": "afk"},
164        };
165        let e2 = Event {
166            id: None,
167            timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
168            duration: Duration::seconds(5),
169            data: json_map! {"status": "not-afk"},
170        };
171        let e3 = Event {
172            id: None,
173            timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
174            duration: Duration::seconds(1),
175            data: json_map! {"status": "not-afk"},
176        };
177        let e4 = Event {
178            id: None,
179            timestamp: DateTime::from_str("2000-01-01T00:00:06Z").unwrap(),
180            duration: Duration::seconds(1),
181            data: json_map! {"status": "afk"},
182        };
183        let res = flood(
184            vec![e1.clone(), e2.clone(), e3.clone(), e4.clone()],
185            Duration::seconds(5),
186        );
187        assert_eq!(3, res.len());
188        assert_eq!(&res[0], &e1);
189        assert_eq!(&res[1], &e2);
190        assert_eq!(&res[2], &e4);
191
192        // e1, stay same
193        // e2, base merge
194        // e3, merge with e2
195        // e4, merge with e2 (longest duration, this should be the duration selected)
196        // e5, stay same
197        let e1 = Event {
198            id: None,
199            timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
200            duration: Duration::seconds(1),
201            data: json_map! {"status": "afk"},
202        };
203        let e2 = Event {
204            id: None,
205            timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
206            duration: Duration::seconds(5),
207            data: json_map! {"status": "not-afk"},
208        };
209        let e3 = Event {
210            id: None,
211            timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
212            duration: Duration::seconds(1),
213            data: json_map! {"status": "not-afk"},
214        };
215        let e4 = Event {
216            id: None,
217            timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
218            duration: Duration::seconds(10),
219            data: json_map! {"status": "not-afk"},
220        };
221        let e5 = Event {
222            id: None,
223            timestamp: DateTime::from_str("2000-01-01T00:00:11Z").unwrap(),
224            duration: Duration::seconds(1),
225            data: json_map! {"status": "afk"},
226        };
227        let res = flood(
228            vec![e1.clone(), e2.clone(), e3.clone(), e4.clone(), e5.clone()],
229            Duration::seconds(5),
230        );
231        assert_eq!(3, res.len());
232        assert_eq!(&res[0], &e1);
233        assert_eq!(&res[1], &e4);
234        assert_eq!(&res[2], &e5);
235    }
236}