1use chrono::{DateTime, TimeZone, Utc};
15use std::cmp::Reverse;
16use std::collections::{HashMap, HashSet};
17use std::io::{BufRead, BufReader, BufWriter, Read, Write};
18use std::ops::Add;
19
20pub mod file;
21
22#[derive(thiserror::Error, Debug)]
23pub enum Error {
24 #[error("I/O error")]
25 Io(#[from] std::io::Error),
26 #[error("Invalid user ID")]
27 InvalidUserId(Option<String>),
28 #[error("Invalid timestamp")]
29 InvalidTimestamp(Option<String>),
30 #[error("Invalid status code")]
31 InvalidStatus(Option<String>),
32}
33
34#[derive(Clone, Copy, Debug, Eq, PartialEq)]
35pub struct Entry {
36 pub status: u32,
37 pub observed: DateTime<Utc>,
38 pub reversal: Option<DateTime<Utc>>,
39}
40
41#[derive(Clone, Debug, Default, Eq, PartialEq)]
42pub struct DeactivationLog {
43 entries: HashMap<u64, Vec<Entry>>,
44}
45
46impl DeactivationLog {
47 pub fn lookup(&self, user_id: u64) -> Option<Vec<Entry>> {
48 self.entries.get(&user_id).cloned()
49 }
50
51 pub fn status(&self, user_id: u64) -> Option<u32> {
52 self.entries.get(&user_id).and_then(|entries| {
53 entries.iter().find_map(|entry| {
54 if entry.reversal.is_none() {
55 Some(entry.status)
56 } else {
57 None
58 }
59 })
60 })
61 }
62
63 pub fn status_timestamp(&self, user_id: u64) -> Option<DateTime<Utc>> {
64 self.entries.get(&user_id).and_then(|entries| {
65 entries.iter().find_map(|entry| {
66 if entry.reversal.is_none() {
67 Some(entry.observed)
68 } else {
69 None
70 }
71 })
72 })
73 }
74
75 pub fn deactivations(&self, status_filter: Option<u32>) -> Vec<(u64, Entry)> {
76 let mut entries = self.entries.iter().collect::<Vec<_>>();
77 entries.sort_by_key(|(user_id, _)| *user_id);
78
79 entries
80 .iter()
81 .flat_map(|(user_id, entries)| {
82 entries.iter().filter_map(|entry| {
83 if status_filter
84 .map(|status| entry.status == status)
85 .unwrap_or(true)
86 {
87 Some((**user_id, *entry))
88 } else {
89 None
90 }
91 })
92 })
93 .collect()
94 }
95
96 pub fn ever_deactivated(&self, status_filter: Option<u32>) -> HashSet<u64> {
97 self.entries
98 .iter()
99 .filter_map(|(user_id, entries)| {
100 if entries.iter().any(|entry| {
101 status_filter
102 .map(|status| entry.status == status)
103 .unwrap_or(true)
104 }) {
105 Some(*user_id)
106 } else {
107 None
108 }
109 })
110 .collect()
111 }
112
113 pub fn current_deactivated(&self, status_filter: Option<u32>) -> HashSet<u64> {
114 self.entries
115 .iter()
116 .filter_map(|(user_id, entries)| {
117 if entries
118 .last()
119 .map(|entry| {
120 entry.reversal.is_none()
121 && status_filter
122 .map(|status| entry.status == status)
123 .unwrap_or(true)
124 })
125 .unwrap_or(false)
126 {
127 Some(*user_id)
128 } else {
129 None
130 }
131 })
132 .collect()
133 }
134
135 pub fn update_with_reversals<I: Iterator<Item = (u64, DateTime<Utc>)>>(
136 &mut self,
137 reversals: I,
138 ) -> Result<(), Vec<(u64, DateTime<Utc>)>> {
139 let mut invalid_pairs = vec![];
140
141 for (user_id, timestamp) in reversals {
142 match self
143 .entries
144 .get_mut(&user_id)
145 .and_then(|entries| entries.last_mut())
146 {
147 Some(last) => {
148 if last.reversal.is_none() {
149 last.reversal = Some(timestamp);
150 } else {
151 invalid_pairs.push((user_id, timestamp));
152 }
153 }
154 None => {
155 invalid_pairs.push((user_id, timestamp));
156 }
157 }
158 }
159
160 if invalid_pairs.is_empty() {
161 Ok(())
162 } else {
163 Err(invalid_pairs)
164 }
165 }
166
167 pub fn validate(&self) -> Result<(), Vec<u64>> {
168 let mut invalid_user_ids = self
169 .entries
170 .iter()
171 .filter_map(|(user_id, entries)| {
172 if !entries.is_empty() && Self::validate_entries(entries) {
173 None
174 } else {
175 Some(*user_id)
176 }
177 })
178 .collect::<Vec<_>>();
179
180 invalid_user_ids.sort_unstable();
181
182 if invalid_user_ids.is_empty() {
183 Ok(())
184 } else {
185 Err(invalid_user_ids)
186 }
187 }
188
189 fn validate_entries(entries: &[Entry]) -> bool {
190 let valid_pairs = entries.windows(2).all(|pair| match pair[0].reversal {
191 Some(reversal) => pair[0].observed < reversal && pair[0].observed < pair[1].observed,
192 None => false,
193 });
194
195 valid_pairs
198 && match entries.last() {
199 Some(entry) => match entry.reversal {
200 Some(reversal) => entry.observed < reversal,
201 None => true,
202 },
203 None => true,
204 }
205 }
206
207 pub fn read<R: Read>(reader: R) -> Result<Self, Error> {
208 let mut entries: HashMap<u64, Vec<Entry>> = HashMap::new();
209
210 for line in BufReader::new(reader).lines() {
211 let line = line?;
212 let fields = line.split(',').collect::<Vec<_>>();
213
214 let user_id = fields
215 .first()
216 .and_then(|value| value.parse::<u64>().ok())
217 .ok_or_else(|| {
218 Error::InvalidUserId(fields.first().map(|value| value.to_string()))
219 })?;
220
221 let status = fields
222 .get(1)
223 .and_then(|value| value.parse::<u32>().ok())
224 .ok_or_else(|| {
225 Error::InvalidStatus(fields.get(1).map(|value| value.to_string()))
226 })?;
227
228 let observed = fields
229 .get(2)
230 .and_then(|value| value.parse::<i64>().ok())
231 .map(|value| Utc.timestamp(value, 0))
232 .ok_or_else(|| {
233 Error::InvalidTimestamp(fields.get(2).map(|value| value.to_string()))
234 })?;
235
236 let reversal = fields
237 .get(3)
238 .and_then(|value| {
239 if value.is_empty() {
240 Some(None)
241 } else {
242 value
243 .parse::<i64>()
244 .ok()
245 .map(|value| Some(Utc.timestamp(value, 0)))
246 }
247 })
248 .ok_or_else(|| {
249 Error::InvalidTimestamp(fields.get(3).map(|value| value.to_string()))
250 })?;
251
252 let seen = entries.entry(user_id).or_default();
253 seen.push(Entry {
254 status,
255 observed,
256 reversal,
257 });
258 }
259
260 Ok(Self { entries })
261 }
262
263 pub fn write<W: Write>(&self, writer: W) -> Result<(), std::io::Error> {
264 let mut entries = self.entries.iter().collect::<Vec<_>>();
265 entries.sort_by_key(|(user_id, _)| *user_id);
266
267 let mut writer = BufWriter::new(writer);
268
269 for (user_id, entries) in entries {
270 for entry in entries {
271 writeln!(
272 writer,
273 "{},{},{},{}",
274 user_id,
275 entry.status,
276 entry.observed.timestamp(),
277 entry
278 .reversal
279 .map(|value| value.timestamp().to_string())
280 .unwrap_or_default()
281 )?;
282 }
283 }
284
285 Ok(())
286 }
287
288 pub fn add(&mut self, user_id: u64, status: u32, observed: DateTime<Utc>) {
289 let mut singleton_entries = HashMap::new();
290 singleton_entries.insert(
291 user_id,
292 vec![Entry {
293 status,
294 observed,
295 reversal: None,
296 }],
297 );
298
299 let singleton_log = Self {
300 entries: singleton_entries,
301 };
302
303 let self_log: &Self = self;
304 let new_log = self_log + &singleton_log;
305
306 self.entries = new_log.entries;
307 }
308
309 pub fn add_all(&mut self, updates: HashMap<u64, (u32, DateTime<Utc>)>) {
310 let mut update_entries = HashMap::new();
311
312 for (user_id, (status, observed)) in updates.into_iter() {
313 update_entries.insert(
314 user_id,
315 vec![Entry {
316 status,
317 observed,
318 reversal: None,
319 }],
320 );
321 }
322
323 let update_log = Self {
324 entries: update_entries,
325 };
326
327 let self_log: &Self = self;
328 let new_log = self_log + &update_log;
329
330 self.entries = new_log.entries;
331 }
332}
333
334impl Add for &DeactivationLog {
335 type Output = DeactivationLog;
336
337 fn add(self, other: Self) -> Self::Output {
338 let mut new_entry_map = self.entries.clone();
339
340 for (user_id, entries) in &other.entries {
341 let new_entries = new_entry_map.entry(*user_id).or_default();
342 new_entries.extend(entries.clone());
343 new_entries.sort_by_key(|entry| (entry.observed, Reverse(entry.reversal)));
345 new_entries.dedup_by_key(|entry| (entry.observed, entry.status));
346
347 let len = new_entries.len();
348 if len >= 2 {
349 let last1 = &new_entries[len - 2];
350 let last2 = &new_entries[len - 1];
351 if last1.status == last2.status
352 && last1.reversal.is_none()
353 && last2.reversal.is_none()
354 {
355 new_entries.pop();
356 }
357 }
358 }
359
360 Self::Output {
361 entries: new_entry_map,
362 }
363 }
364}