memory_lol/db/
accounts.rs

1use super::{
2    table::{Mode, Table, Writeable},
3    util::is_valid_screen_name,
4    Error,
5};
6use chrono::{Duration, NaiveDate};
7use rocksdb::{DBIterator, IteratorMode, MergeOperands, Options, DB};
8use std::collections::HashMap;
9use std::convert::TryInto;
10use std::marker::PhantomData;
11use std::path::Path;
12
13#[derive(Clone, Debug, Eq, PartialEq)]
14pub struct AccountTableCounts {
15    pub id_count: u64,
16    pub pair_count: u64,
17}
18
19pub struct AccountTable<M> {
20    db: DB,
21    mode: PhantomData<M>,
22}
23
24impl<M> Table for AccountTable<M> {
25    type Counts = AccountTableCounts;
26
27    fn underlying(&self) -> &DB {
28        &self.db
29    }
30
31    fn get_counts(&self) -> Result<Self::Counts, Error> {
32        let mut pair_count = 0;
33        let mut id_count = 0;
34        let mut last_id = 0;
35
36        let iter = self.db.iterator(IteratorMode::Start);
37
38        for result in iter {
39            let (key, _) = result?;
40            pair_count += 1;
41
42            let id = key_prefix_to_id(&key)?;
43
44            if id != last_id {
45                id_count += 1;
46                last_id = id;
47            }
48        }
49
50        Ok(Self::Counts {
51            id_count,
52            pair_count,
53        })
54    }
55}
56
57impl<M> AccountTable<M> {
58    pub fn pairs(&self) -> PairIterator {
59        PairIterator {
60            underlying: self.db.iterator(IteratorMode::Start),
61        }
62    }
63
64    pub fn lookup(&self, id: u64) -> Result<HashMap<String, Vec<NaiveDate>>, Error> {
65        let prefix = id_to_key_prefix(id);
66        let iter = self.db.prefix_iterator(prefix);
67        let mut results = HashMap::new();
68
69        for result in iter {
70            let (key, value) = result?;
71            let (next_id, next_screen_name) = key_to_pair(&key)?;
72
73            if next_id == id {
74                let dates = value_to_dates(&value)?;
75                results.insert(next_screen_name.to_string(), dates);
76            } else {
77                break;
78            }
79        }
80
81        Ok(results)
82    }
83
84    pub fn limited_lookup(
85        &self,
86        id: u64,
87        earliest: NaiveDate,
88    ) -> Result<HashMap<String, Vec<NaiveDate>>, Error> {
89        let prefix = id_to_key_prefix(id);
90        let iter = self.db.prefix_iterator(prefix);
91        let mut results = HashMap::new();
92
93        for result in iter {
94            let (key, value) = result?;
95            let (next_id, next_screen_name) = key_to_pair(&key)?;
96
97            if next_id == id {
98                let dates = value_to_dates(&value)?;
99                if dates.iter().any(|date| date >= &earliest) {
100                    results.insert(next_screen_name.to_string(), dates);
101                }
102            } else {
103                break;
104            }
105        }
106
107        Ok(results)
108    }
109
110    pub fn get_date_counts(&self) -> Result<Vec<(NaiveDate, u64)>, Error> {
111        let mut map = HashMap::new();
112        let iter = self.db.iterator(IteratorMode::Start);
113
114        for result in iter {
115            let (_, value) = result?;
116            let dates = value_to_dates(&value)?;
117
118            for date in dates {
119                let count = map.entry(date).or_default();
120                *count += 1;
121            }
122        }
123
124        let mut result = map.into_iter().collect::<Vec<_>>();
125        result.sort();
126
127        Ok(result)
128    }
129
130    pub fn get_most_screen_names(&self, k: usize) -> Result<Vec<(u64, Vec<String>)>, Error> {
131        let mut queue = priority_queue::DoublePriorityQueue::with_capacity(k);
132        let iter = self.db.iterator(IteratorMode::Start);
133        let mut last_id = 0;
134        let mut current: Vec<String> = vec![];
135
136        for result in iter {
137            let (key, _) = result?;
138            let (id, screen_name) = key_to_pair(&key)?;
139
140            if id != last_id {
141                let min = queue.peek_min().map(|(_, count)| *count).unwrap_or(0);
142                let len = current.len();
143
144                if len >= min || queue.len() < k {
145                    queue.push((last_id, current.drain(..).collect()), len);
146
147                    if queue.len() > k {
148                        queue.pop_min();
149                    }
150                } else {
151                    current.clear();
152                }
153
154                last_id = id;
155            }
156            current.push(screen_name.to_string());
157        }
158
159        Ok(queue.into_descending_sorted_vec())
160    }
161}
162
163impl<M: Mode> AccountTable<M> {
164    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
165        let mut options = Options::default();
166        options.create_if_missing(true);
167        options.set_merge_operator_associative("merge", merge);
168
169        let db = if M::is_read_only() {
170            DB::open_for_read_only(&options, path, true)?
171        } else {
172            DB::open(&options, path)?
173        };
174
175        Ok(Self {
176            db,
177            mode: PhantomData,
178        })
179    }
180}
181
182impl AccountTable<Writeable> {
183    pub fn insert(&self, id: u64, screen_name: &str, dates: Vec<NaiveDate>) -> Result<(), Error> {
184        if is_valid_screen_name(screen_name) {
185            let mut value = Vec::with_capacity(2 * dates.len());
186
187            for date in dates {
188                value.extend_from_slice(&date_to_day_id(&date)?.to_be_bytes());
189            }
190
191            self.db.merge(pair_to_key(id, screen_name), value)?;
192
193            Ok(())
194        } else {
195            Err(Error::InvalidScreenName(screen_name.to_string()))
196        }
197    }
198
199    pub fn remove(&self, id: u64, screen_name: &str) -> Result<(), Error> {
200        let key = pair_to_key(id, screen_name);
201
202        Ok(self.db.delete(key)?)
203    }
204
205    pub fn compact_ranges(&self) -> Result<(), Error> {
206        let iter = self.db.iterator(IteratorMode::Start);
207
208        for result in iter {
209            let (key, value) = result?;
210            let mut dates = value_to_dates(&value)?;
211
212            // If we don't have more than a range we don't need to compact
213            if dates.len() > 2 {
214                dates.sort();
215                dates.dedup();
216
217                let compacted_dates = if dates.len() <= 2 {
218                    dates
219                } else {
220                    let mut compacted_dates = Vec::with_capacity(2);
221
222                    if let Some(first) = dates.first() {
223                        compacted_dates.push(*first);
224                    }
225
226                    if let Some(last) = dates.last() {
227                        compacted_dates.push(*last);
228                    }
229
230                    compacted_dates
231                };
232
233                let mut new_value = Vec::with_capacity(2 * compacted_dates.len());
234
235                for date in compacted_dates {
236                    new_value.extend_from_slice(&date_to_day_id(&date)?.to_be_bytes());
237                }
238
239                self.db.put(key, new_value)?;
240            }
241        }
242
243        Ok(())
244    }
245}
246
247fn merge(
248    _new_key: &[u8],
249    existing_val: Option<&[u8]>,
250    operands: &MergeOperands,
251) -> Option<Vec<u8>> {
252    let mut new_val = match existing_val {
253        Some(bytes) => bytes.to_vec(),
254        None => Vec::with_capacity(operands.len() * 2),
255    };
256
257    for operand in operands.iter() {
258        merge_for_pair(&mut new_val, operand);
259    }
260
261    Some(new_val)
262}
263
264fn merge_for_pair(a: &mut Vec<u8>, b: &[u8]) {
265    let original_len = a.len();
266    let mut i = 0;
267
268    while i < b.len() {
269        let bytes: [u8; 2] = match b[i..i + 2].try_into() {
270            Ok(bytes) => bytes,
271            Err(error) => {
272                log::error!("{}", error);
273                return;
274            }
275        };
276        let next_b = u16::from_be_bytes(bytes);
277
278        let mut found = false;
279        let mut j = 0;
280
281        while !found && j < original_len {
282            let bytes = match a[j..j + 2].try_into() {
283                Ok(bytes) => bytes,
284                Err(error) => {
285                    log::error!("{}", error);
286                    return;
287                }
288            };
289            let next_a = u16::from_be_bytes(bytes);
290            found = next_a == next_b;
291            j += 2;
292        }
293
294        if !found {
295            a.extend_from_slice(&b[i..i + 2]);
296        }
297        i += 2;
298    }
299}
300
301pub struct PairIterator<'a> {
302    underlying: DBIterator<'a>,
303}
304
305impl Iterator for PairIterator<'_> {
306    type Item = Result<(u64, String, Vec<NaiveDate>), Error>;
307
308    fn next(&mut self) -> Option<Self::Item> {
309        self.underlying.next().map(|result| {
310            result
311                .map_err(Error::from)
312                .and_then(|(key, value)| kv_to_item(&key, &value))
313        })
314    }
315}
316
317fn kv_to_item(key: &[u8], value: &[u8]) -> Result<(u64, String, Vec<NaiveDate>), Error> {
318    let (id, screen_name) = key_to_pair(key)?;
319    let dates = value_to_dates(value)?;
320
321    Ok((id, screen_name.to_string(), dates))
322}
323
324fn id_to_key_prefix(id: u64) -> [u8; 8] {
325    id.to_be_bytes()
326}
327
328fn key_prefix_to_id(key: &[u8]) -> Result<u64, Error> {
329    Ok(u64::from_be_bytes(
330        key[0..8]
331            .try_into()
332            .map_err(|_| Error::InvalidKey(key.to_vec()))?,
333    ))
334}
335
336fn pair_to_key(id: u64, screen_name: &str) -> Vec<u8> {
337    let screen_name_bytes = screen_name.as_bytes();
338    let mut prefix = Vec::with_capacity(8 + screen_name_bytes.len());
339    prefix.extend_from_slice(&id.to_be_bytes());
340    prefix.extend_from_slice(screen_name_bytes);
341    prefix
342}
343
344fn key_to_pair(key: &[u8]) -> Result<(u64, &str), Error> {
345    let id = key_prefix_to_id(key)?;
346    let screen_name = std::str::from_utf8(&key[8..])?;
347
348    Ok((id, screen_name))
349}
350
351lazy_static::lazy_static! {
352    /// Date of the first tweet
353    static ref TWITTER_EPOCH: NaiveDate = NaiveDate::from_ymd(2006, 3, 21);
354}
355
356fn date_to_day_id(date: &NaiveDate) -> Result<u16, Error> {
357    let day = (*date - *TWITTER_EPOCH).num_days();
358    day.try_into().map_err(|_| Error::InvalidDay(day))
359}
360
361fn day_id_to_date(day_id: u16) -> NaiveDate {
362    *TWITTER_EPOCH + Duration::days(day_id.into())
363}
364
365fn value_to_dates(value: &[u8]) -> Result<Vec<NaiveDate>, Error> {
366    let count = value.len() / 2;
367    let mut result = Vec::with_capacity(count);
368
369    for i in 0..count {
370        let day_id = u16::from_be_bytes(
371            value[i * 2..(i * 2 + 2)]
372                .try_into()
373                .map_err(|_| Error::InvalidValue(value.to_vec()))?,
374        );
375        result.push(day_id_to_date(day_id));
376    }
377
378    result.sort();
379    Ok(result)
380}