hst_deactivations/
lib.rs

1//! Simple database for tracking user account deactivations.
2//!
3//! This library was originally designed to track Twitter account suspensions and
4//! self-deactivations, but should be general enough to work in other contexts.
5//!
6//! It makes a few assumptions:
7//!
8//! * Users have an integral identifier (e.g. the Twitter ID).
9//! * Deactivations have an integral status code (e.g. for Twitter, 50 for self-deactivation and
10//!   63 for suspension).
11//! * A deactivation has a time at which it was first observed and (optionally) another at which it
12//!   was reversed.
13
14use chrono::{DateTime, TimeZone, Utc};
15use std::cmp::Reverse;
16use std::collections::{HashMap, HashSet};
17use std::io::{BufRead, BufReader, BufWriter, Read, Write};
18use std::ops::Add;
19
20pub mod file;
21
22#[derive(thiserror::Error, Debug)]
23pub enum Error {
24    #[error("I/O error")]
25    Io(#[from] std::io::Error),
26    #[error("Invalid user ID")]
27    InvalidUserId(Option<String>),
28    #[error("Invalid timestamp")]
29    InvalidTimestamp(Option<String>),
30    #[error("Invalid status code")]
31    InvalidStatus(Option<String>),
32}
33
34#[derive(Clone, Copy, Debug, Eq, PartialEq)]
35pub struct Entry {
36    pub status: u32,
37    pub observed: DateTime<Utc>,
38    pub reversal: Option<DateTime<Utc>>,
39}
40
41#[derive(Clone, Debug, Default, Eq, PartialEq)]
42pub struct DeactivationLog {
43    entries: HashMap<u64, Vec<Entry>>,
44}
45
46impl DeactivationLog {
47    pub fn lookup(&self, user_id: u64) -> Option<Vec<Entry>> {
48        self.entries.get(&user_id).cloned()
49    }
50
51    pub fn status(&self, user_id: u64) -> Option<u32> {
52        self.entries.get(&user_id).and_then(|entries| {
53            entries.iter().find_map(|entry| {
54                if entry.reversal.is_none() {
55                    Some(entry.status)
56                } else {
57                    None
58                }
59            })
60        })
61    }
62
63    pub fn status_timestamp(&self, user_id: u64) -> Option<DateTime<Utc>> {
64        self.entries.get(&user_id).and_then(|entries| {
65            entries.iter().find_map(|entry| {
66                if entry.reversal.is_none() {
67                    Some(entry.observed)
68                } else {
69                    None
70                }
71            })
72        })
73    }
74
75    pub fn deactivations(&self, status_filter: Option<u32>) -> Vec<(u64, Entry)> {
76        let mut entries = self.entries.iter().collect::<Vec<_>>();
77        entries.sort_by_key(|(user_id, _)| *user_id);
78
79        entries
80            .iter()
81            .flat_map(|(user_id, entries)| {
82                entries.iter().filter_map(|entry| {
83                    if status_filter
84                        .map(|status| entry.status == status)
85                        .unwrap_or(true)
86                    {
87                        Some((**user_id, *entry))
88                    } else {
89                        None
90                    }
91                })
92            })
93            .collect()
94    }
95
96    pub fn ever_deactivated(&self, status_filter: Option<u32>) -> HashSet<u64> {
97        self.entries
98            .iter()
99            .filter_map(|(user_id, entries)| {
100                if entries.iter().any(|entry| {
101                    status_filter
102                        .map(|status| entry.status == status)
103                        .unwrap_or(true)
104                }) {
105                    Some(*user_id)
106                } else {
107                    None
108                }
109            })
110            .collect()
111    }
112
113    pub fn current_deactivated(&self, status_filter: Option<u32>) -> HashSet<u64> {
114        self.entries
115            .iter()
116            .filter_map(|(user_id, entries)| {
117                if entries
118                    .last()
119                    .map(|entry| {
120                        entry.reversal.is_none()
121                            && status_filter
122                                .map(|status| entry.status == status)
123                                .unwrap_or(true)
124                    })
125                    .unwrap_or(false)
126                {
127                    Some(*user_id)
128                } else {
129                    None
130                }
131            })
132            .collect()
133    }
134
135    pub fn update_with_reversals<I: Iterator<Item = (u64, DateTime<Utc>)>>(
136        &mut self,
137        reversals: I,
138    ) -> Result<(), Vec<(u64, DateTime<Utc>)>> {
139        let mut invalid_pairs = vec![];
140
141        for (user_id, timestamp) in reversals {
142            match self
143                .entries
144                .get_mut(&user_id)
145                .and_then(|entries| entries.last_mut())
146            {
147                Some(last) => {
148                    if last.reversal.is_none() {
149                        last.reversal = Some(timestamp);
150                    } else {
151                        invalid_pairs.push((user_id, timestamp));
152                    }
153                }
154                None => {
155                    invalid_pairs.push((user_id, timestamp));
156                }
157            }
158        }
159
160        if invalid_pairs.is_empty() {
161            Ok(())
162        } else {
163            Err(invalid_pairs)
164        }
165    }
166
167    pub fn validate(&self) -> Result<(), Vec<u64>> {
168        let mut invalid_user_ids = self
169            .entries
170            .iter()
171            .filter_map(|(user_id, entries)| {
172                if !entries.is_empty() && Self::validate_entries(entries) {
173                    None
174                } else {
175                    Some(*user_id)
176                }
177            })
178            .collect::<Vec<_>>();
179
180        invalid_user_ids.sort_unstable();
181
182        if invalid_user_ids.is_empty() {
183            Ok(())
184        } else {
185            Err(invalid_user_ids)
186        }
187    }
188
189    fn validate_entries(entries: &[Entry]) -> bool {
190        let valid_pairs = entries.windows(2).all(|pair| match pair[0].reversal {
191            Some(reversal) => pair[0].observed < reversal && pair[0].observed < pair[1].observed,
192            None => false,
193        });
194
195        // We still have to checked whether the reversal (if there was one) for the final entry
196        // happened after the observation.
197        valid_pairs
198            && match entries.last() {
199                Some(entry) => match entry.reversal {
200                    Some(reversal) => entry.observed < reversal,
201                    None => true,
202                },
203                None => true,
204            }
205    }
206
207    pub fn read<R: Read>(reader: R) -> Result<Self, Error> {
208        let mut entries: HashMap<u64, Vec<Entry>> = HashMap::new();
209
210        for line in BufReader::new(reader).lines() {
211            let line = line?;
212            let fields = line.split(',').collect::<Vec<_>>();
213
214            let user_id = fields
215                .first()
216                .and_then(|value| value.parse::<u64>().ok())
217                .ok_or_else(|| {
218                    Error::InvalidUserId(fields.first().map(|value| value.to_string()))
219                })?;
220
221            let status = fields
222                .get(1)
223                .and_then(|value| value.parse::<u32>().ok())
224                .ok_or_else(|| {
225                    Error::InvalidStatus(fields.get(1).map(|value| value.to_string()))
226                })?;
227
228            let observed = fields
229                .get(2)
230                .and_then(|value| value.parse::<i64>().ok())
231                .map(|value| Utc.timestamp(value, 0))
232                .ok_or_else(|| {
233                    Error::InvalidTimestamp(fields.get(2).map(|value| value.to_string()))
234                })?;
235
236            let reversal = fields
237                .get(3)
238                .and_then(|value| {
239                    if value.is_empty() {
240                        Some(None)
241                    } else {
242                        value
243                            .parse::<i64>()
244                            .ok()
245                            .map(|value| Some(Utc.timestamp(value, 0)))
246                    }
247                })
248                .ok_or_else(|| {
249                    Error::InvalidTimestamp(fields.get(3).map(|value| value.to_string()))
250                })?;
251
252            let seen = entries.entry(user_id).or_default();
253            seen.push(Entry {
254                status,
255                observed,
256                reversal,
257            });
258        }
259
260        Ok(Self { entries })
261    }
262
263    pub fn write<W: Write>(&self, writer: W) -> Result<(), std::io::Error> {
264        let mut entries = self.entries.iter().collect::<Vec<_>>();
265        entries.sort_by_key(|(user_id, _)| *user_id);
266
267        let mut writer = BufWriter::new(writer);
268
269        for (user_id, entries) in entries {
270            for entry in entries {
271                writeln!(
272                    writer,
273                    "{},{},{},{}",
274                    user_id,
275                    entry.status,
276                    entry.observed.timestamp(),
277                    entry
278                        .reversal
279                        .map(|value| value.timestamp().to_string())
280                        .unwrap_or_default()
281                )?;
282            }
283        }
284
285        Ok(())
286    }
287
288    pub fn add(&mut self, user_id: u64, status: u32, observed: DateTime<Utc>) {
289        let mut singleton_entries = HashMap::new();
290        singleton_entries.insert(
291            user_id,
292            vec![Entry {
293                status,
294                observed,
295                reversal: None,
296            }],
297        );
298
299        let singleton_log = Self {
300            entries: singleton_entries,
301        };
302
303        let self_log: &Self = self;
304        let new_log = self_log + &singleton_log;
305
306        self.entries = new_log.entries;
307    }
308
309    pub fn add_all(&mut self, updates: HashMap<u64, (u32, DateTime<Utc>)>) {
310        let mut update_entries = HashMap::new();
311
312        for (user_id, (status, observed)) in updates.into_iter() {
313            update_entries.insert(
314                user_id,
315                vec![Entry {
316                    status,
317                    observed,
318                    reversal: None,
319                }],
320            );
321        }
322
323        let update_log = Self {
324            entries: update_entries,
325        };
326
327        let self_log: &Self = self;
328        let new_log = self_log + &update_log;
329
330        self.entries = new_log.entries;
331    }
332}
333
334impl Add for &DeactivationLog {
335    type Output = DeactivationLog;
336
337    fn add(self, other: Self) -> Self::Output {
338        let mut new_entry_map = self.entries.clone();
339
340        for (user_id, entries) in &other.entries {
341            let new_entries = new_entry_map.entry(*user_id).or_default();
342            new_entries.extend(entries.clone());
343            // We want non-empty reversals to come first.
344            new_entries.sort_by_key(|entry| (entry.observed, Reverse(entry.reversal)));
345            new_entries.dedup_by_key(|entry| (entry.observed, entry.status));
346
347            let len = new_entries.len();
348            if len >= 2 {
349                let last1 = &new_entries[len - 2];
350                let last2 = &new_entries[len - 1];
351                if last1.status == last2.status
352                    && last1.reversal.is_none()
353                    && last2.reversal.is_none()
354                {
355                    new_entries.pop();
356                }
357            }
358        }
359
360        Self::Output {
361            entries: new_entry_map,
362        }
363    }
364}