1use super::{
2 table::{Mode, Table, Writeable},
3 util::is_valid_screen_name,
4 Error,
5};
6use chrono::{Duration, NaiveDate};
7use rocksdb::{DBIterator, IteratorMode, MergeOperands, Options, DB};
8use std::collections::HashMap;
9use std::convert::TryInto;
10use std::marker::PhantomData;
11use std::path::Path;
12
13#[derive(Clone, Debug, Eq, PartialEq)]
14pub struct AccountTableCounts {
15 pub id_count: u64,
16 pub pair_count: u64,
17}
18
19pub struct AccountTable<M> {
20 db: DB,
21 mode: PhantomData<M>,
22}
23
24impl<M> Table for AccountTable<M> {
25 type Counts = AccountTableCounts;
26
27 fn underlying(&self) -> &DB {
28 &self.db
29 }
30
31 fn get_counts(&self) -> Result<Self::Counts, Error> {
32 let mut pair_count = 0;
33 let mut id_count = 0;
34 let mut last_id = 0;
35
36 let iter = self.db.iterator(IteratorMode::Start);
37
38 for result in iter {
39 let (key, _) = result?;
40 pair_count += 1;
41
42 let id = key_prefix_to_id(&key)?;
43
44 if id != last_id {
45 id_count += 1;
46 last_id = id;
47 }
48 }
49
50 Ok(Self::Counts {
51 id_count,
52 pair_count,
53 })
54 }
55}
56
57impl<M> AccountTable<M> {
58 pub fn pairs(&self) -> PairIterator {
59 PairIterator {
60 underlying: self.db.iterator(IteratorMode::Start),
61 }
62 }
63
64 pub fn lookup(&self, id: u64) -> Result<HashMap<String, Vec<NaiveDate>>, Error> {
65 let prefix = id_to_key_prefix(id);
66 let iter = self.db.prefix_iterator(prefix);
67 let mut results = HashMap::new();
68
69 for result in iter {
70 let (key, value) = result?;
71 let (next_id, next_screen_name) = key_to_pair(&key)?;
72
73 if next_id == id {
74 let dates = value_to_dates(&value)?;
75 results.insert(next_screen_name.to_string(), dates);
76 } else {
77 break;
78 }
79 }
80
81 Ok(results)
82 }
83
84 pub fn limited_lookup(
85 &self,
86 id: u64,
87 earliest: NaiveDate,
88 ) -> Result<HashMap<String, Vec<NaiveDate>>, Error> {
89 let prefix = id_to_key_prefix(id);
90 let iter = self.db.prefix_iterator(prefix);
91 let mut results = HashMap::new();
92
93 for result in iter {
94 let (key, value) = result?;
95 let (next_id, next_screen_name) = key_to_pair(&key)?;
96
97 if next_id == id {
98 let dates = value_to_dates(&value)?;
99 if dates.iter().any(|date| date >= &earliest) {
100 results.insert(next_screen_name.to_string(), dates);
101 }
102 } else {
103 break;
104 }
105 }
106
107 Ok(results)
108 }
109
110 pub fn get_date_counts(&self) -> Result<Vec<(NaiveDate, u64)>, Error> {
111 let mut map = HashMap::new();
112 let iter = self.db.iterator(IteratorMode::Start);
113
114 for result in iter {
115 let (_, value) = result?;
116 let dates = value_to_dates(&value)?;
117
118 for date in dates {
119 let count = map.entry(date).or_default();
120 *count += 1;
121 }
122 }
123
124 let mut result = map.into_iter().collect::<Vec<_>>();
125 result.sort();
126
127 Ok(result)
128 }
129
130 pub fn get_most_screen_names(&self, k: usize) -> Result<Vec<(u64, Vec<String>)>, Error> {
131 let mut queue = priority_queue::DoublePriorityQueue::with_capacity(k);
132 let iter = self.db.iterator(IteratorMode::Start);
133 let mut last_id = 0;
134 let mut current: Vec<String> = vec![];
135
136 for result in iter {
137 let (key, _) = result?;
138 let (id, screen_name) = key_to_pair(&key)?;
139
140 if id != last_id {
141 let min = queue.peek_min().map(|(_, count)| *count).unwrap_or(0);
142 let len = current.len();
143
144 if len >= min || queue.len() < k {
145 queue.push((last_id, current.drain(..).collect()), len);
146
147 if queue.len() > k {
148 queue.pop_min();
149 }
150 } else {
151 current.clear();
152 }
153
154 last_id = id;
155 }
156 current.push(screen_name.to_string());
157 }
158
159 Ok(queue.into_descending_sorted_vec())
160 }
161}
162
163impl<M: Mode> AccountTable<M> {
164 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
165 let mut options = Options::default();
166 options.create_if_missing(true);
167 options.set_merge_operator_associative("merge", merge);
168
169 let db = if M::is_read_only() {
170 DB::open_for_read_only(&options, path, true)?
171 } else {
172 DB::open(&options, path)?
173 };
174
175 Ok(Self {
176 db,
177 mode: PhantomData,
178 })
179 }
180}
181
182impl AccountTable<Writeable> {
183 pub fn insert(&self, id: u64, screen_name: &str, dates: Vec<NaiveDate>) -> Result<(), Error> {
184 if is_valid_screen_name(screen_name) {
185 let mut value = Vec::with_capacity(2 * dates.len());
186
187 for date in dates {
188 value.extend_from_slice(&date_to_day_id(&date)?.to_be_bytes());
189 }
190
191 self.db.merge(pair_to_key(id, screen_name), value)?;
192
193 Ok(())
194 } else {
195 Err(Error::InvalidScreenName(screen_name.to_string()))
196 }
197 }
198
199 pub fn remove(&self, id: u64, screen_name: &str) -> Result<(), Error> {
200 let key = pair_to_key(id, screen_name);
201
202 Ok(self.db.delete(key)?)
203 }
204
205 pub fn compact_ranges(&self) -> Result<(), Error> {
206 let iter = self.db.iterator(IteratorMode::Start);
207
208 for result in iter {
209 let (key, value) = result?;
210 let mut dates = value_to_dates(&value)?;
211
212 if dates.len() > 2 {
214 dates.sort();
215 dates.dedup();
216
217 let compacted_dates = if dates.len() <= 2 {
218 dates
219 } else {
220 let mut compacted_dates = Vec::with_capacity(2);
221
222 if let Some(first) = dates.first() {
223 compacted_dates.push(*first);
224 }
225
226 if let Some(last) = dates.last() {
227 compacted_dates.push(*last);
228 }
229
230 compacted_dates
231 };
232
233 let mut new_value = Vec::with_capacity(2 * compacted_dates.len());
234
235 for date in compacted_dates {
236 new_value.extend_from_slice(&date_to_day_id(&date)?.to_be_bytes());
237 }
238
239 self.db.put(key, new_value)?;
240 }
241 }
242
243 Ok(())
244 }
245}
246
247fn merge(
248 _new_key: &[u8],
249 existing_val: Option<&[u8]>,
250 operands: &MergeOperands,
251) -> Option<Vec<u8>> {
252 let mut new_val = match existing_val {
253 Some(bytes) => bytes.to_vec(),
254 None => Vec::with_capacity(operands.len() * 2),
255 };
256
257 for operand in operands.iter() {
258 merge_for_pair(&mut new_val, operand);
259 }
260
261 Some(new_val)
262}
263
264fn merge_for_pair(a: &mut Vec<u8>, b: &[u8]) {
265 let original_len = a.len();
266 let mut i = 0;
267
268 while i < b.len() {
269 let bytes: [u8; 2] = match b[i..i + 2].try_into() {
270 Ok(bytes) => bytes,
271 Err(error) => {
272 log::error!("{}", error);
273 return;
274 }
275 };
276 let next_b = u16::from_be_bytes(bytes);
277
278 let mut found = false;
279 let mut j = 0;
280
281 while !found && j < original_len {
282 let bytes = match a[j..j + 2].try_into() {
283 Ok(bytes) => bytes,
284 Err(error) => {
285 log::error!("{}", error);
286 return;
287 }
288 };
289 let next_a = u16::from_be_bytes(bytes);
290 found = next_a == next_b;
291 j += 2;
292 }
293
294 if !found {
295 a.extend_from_slice(&b[i..i + 2]);
296 }
297 i += 2;
298 }
299}
300
301pub struct PairIterator<'a> {
302 underlying: DBIterator<'a>,
303}
304
305impl Iterator for PairIterator<'_> {
306 type Item = Result<(u64, String, Vec<NaiveDate>), Error>;
307
308 fn next(&mut self) -> Option<Self::Item> {
309 self.underlying.next().map(|result| {
310 result
311 .map_err(Error::from)
312 .and_then(|(key, value)| kv_to_item(&key, &value))
313 })
314 }
315}
316
317fn kv_to_item(key: &[u8], value: &[u8]) -> Result<(u64, String, Vec<NaiveDate>), Error> {
318 let (id, screen_name) = key_to_pair(key)?;
319 let dates = value_to_dates(value)?;
320
321 Ok((id, screen_name.to_string(), dates))
322}
323
324fn id_to_key_prefix(id: u64) -> [u8; 8] {
325 id.to_be_bytes()
326}
327
328fn key_prefix_to_id(key: &[u8]) -> Result<u64, Error> {
329 Ok(u64::from_be_bytes(
330 key[0..8]
331 .try_into()
332 .map_err(|_| Error::InvalidKey(key.to_vec()))?,
333 ))
334}
335
336fn pair_to_key(id: u64, screen_name: &str) -> Vec<u8> {
337 let screen_name_bytes = screen_name.as_bytes();
338 let mut prefix = Vec::with_capacity(8 + screen_name_bytes.len());
339 prefix.extend_from_slice(&id.to_be_bytes());
340 prefix.extend_from_slice(screen_name_bytes);
341 prefix
342}
343
344fn key_to_pair(key: &[u8]) -> Result<(u64, &str), Error> {
345 let id = key_prefix_to_id(key)?;
346 let screen_name = std::str::from_utf8(&key[8..])?;
347
348 Ok((id, screen_name))
349}
350
351lazy_static::lazy_static! {
352 static ref TWITTER_EPOCH: NaiveDate = NaiveDate::from_ymd(2006, 3, 21);
354}
355
356fn date_to_day_id(date: &NaiveDate) -> Result<u16, Error> {
357 let day = (*date - *TWITTER_EPOCH).num_days();
358 day.try_into().map_err(|_| Error::InvalidDay(day))
359}
360
361fn day_id_to_date(day_id: u16) -> NaiveDate {
362 *TWITTER_EPOCH + Duration::days(day_id.into())
363}
364
365fn value_to_dates(value: &[u8]) -> Result<Vec<NaiveDate>, Error> {
366 let count = value.len() / 2;
367 let mut result = Vec::with_capacity(count);
368
369 for i in 0..count {
370 let day_id = u16::from_be_bytes(
371 value[i * 2..(i * 2 + 2)]
372 .try_into()
373 .map_err(|_| Error::InvalidValue(value.to_vec()))?,
374 );
375 result.push(day_id_to_date(day_id));
376 }
377
378 result.sort();
379 Ok(result)
380}