memory_lol/db/
screen_names.rs

1use super::{
2    accounts::AccountTable,
3    table::{Mode, Table, Writeable},
4    Error,
5};
6use rocksdb::{IteratorMode, MergeOperands, Options, DB};
7use std::convert::TryInto;
8use std::marker::PhantomData;
9use std::path::Path;
10
11#[derive(Clone, Debug, Eq, PartialEq)]
12pub struct ScreenNameTableCounts {
13    pub screen_name_count: u64,
14    pub mapping_count: u64,
15}
16
17pub struct ScreenNameTable<M> {
18    db: Option<DB>,
19    mode: PhantomData<M>,
20}
21
22impl<M> Table for ScreenNameTable<M> {
23    type Counts = ScreenNameTableCounts;
24
25    fn underlying(&self) -> &DB {
26        self.db.as_ref().unwrap()
27    }
28
29    fn get_counts(&self) -> Result<Self::Counts, Error> {
30        let mut screen_name_count = 0;
31        let mut mapping_count = 0;
32
33        let iter = self.db.as_ref().unwrap().iterator(IteratorMode::Start);
34
35        for result in iter {
36            let (_, value) = result?;
37
38            screen_name_count += 1;
39            let value_len = value.len();
40
41            if value_len % 8 == 0 {
42                mapping_count += value_len as u64 / 8;
43            } else {
44                return Err(Error::InvalidValue(value.to_vec()));
45            }
46        }
47
48        Ok(Self::Counts {
49            screen_name_count,
50            mapping_count,
51        })
52    }
53}
54
55impl<M> ScreenNameTable<M> {
56    fn make_options() -> Options {
57        let mut options = Options::default();
58        options.create_if_missing(true);
59        options.set_merge_operator_associative("merge", merge);
60        options
61    }
62
63    pub fn lookup(&self, screen_name: &str) -> Result<Vec<u64>, Error> {
64        let value = self
65            .db
66            .as_ref()
67            .unwrap()
68            .get_pinned(screen_name_to_key(screen_name))?;
69        value
70            .as_ref()
71            .map(|value| value_to_ids(value))
72            .unwrap_or_else(|| Ok(vec![]))
73    }
74
75    pub fn lookup_by_prefix(
76        &self,
77        screen_name: &str,
78        limit: usize,
79    ) -> Result<Vec<(String, Vec<u64>)>, Error> {
80        let prefix = screen_name_to_key(screen_name);
81        let iter = self.db.as_ref().unwrap().prefix_iterator(&prefix);
82        let mut results = Vec::with_capacity(1);
83
84        for result in iter.take(limit) {
85            let (key, value) = result?;
86
87            if key.starts_with(&prefix) {
88                let screen_name = key_to_screen_name(&key)?;
89                let ids = value_to_ids(&value)?;
90
91                results.push((screen_name.to_string(), ids));
92            } else {
93                break;
94            }
95        }
96
97        Ok(results)
98    }
99
100    pub fn get_most_reused(&self, k: usize) -> Result<Vec<(String, Vec<u64>)>, Error> {
101        let mut queue = priority_queue::DoublePriorityQueue::with_capacity(k);
102        let iter = self.db.as_ref().unwrap().iterator(IteratorMode::Start);
103
104        for result in iter {
105            let (key, value) = result?;
106            let screen_name = key_to_screen_name(&key)?;
107            let ids = value_to_ids(&value)?;
108
109            let min = queue.peek_min().map(|(_, count)| *count).unwrap_or(0);
110            let len = ids.len();
111
112            if len >= min || queue.len() < k {
113                queue.push((screen_name.to_string(), ids), len);
114
115                if queue.len() > k {
116                    queue.pop_min();
117                }
118            }
119        }
120
121        Ok(queue.into_descending_sorted_vec())
122    }
123}
124
125impl<M: Mode> ScreenNameTable<M> {
126    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
127        let options = Self::make_options();
128        let db = if M::is_read_only() {
129            DB::open_for_read_only(&options, path, true)?
130        } else {
131            DB::open(&options, path)?
132        };
133
134        Ok(Self {
135            db: Some(db),
136            mode: PhantomData,
137        })
138    }
139}
140
141impl ScreenNameTable<Writeable> {
142    pub fn insert(&self, screen_name: &str, id: u64) -> Result<(), Error> {
143        Ok(self
144            .db
145            .as_ref()
146            .unwrap()
147            .merge(screen_name_to_key(screen_name), id.to_be_bytes())?)
148    }
149
150    pub fn rebuild<Mode>(&mut self, accounts: &AccountTable<Mode>) -> Result<(), Error> {
151        let path = self.db.as_ref().unwrap().path().to_path_buf();
152        self.db.take().unwrap();
153
154        let options = Self::make_options();
155
156        DB::destroy(&options, &path)?;
157
158        self.db = Some(DB::open(&options, &path)?);
159
160        for pair in accounts.pairs() {
161            let (id, screen_name, _) = pair?;
162
163            self.insert(&screen_name, id)?;
164        }
165
166        Ok(())
167    }
168}
169
170fn merge(
171    _new_key: &[u8],
172    existing_val: Option<&[u8]>,
173    operands: &MergeOperands,
174) -> Option<Vec<u8>> {
175    let mut new_val = match existing_val {
176        Some(bytes) => bytes.to_vec(),
177        None => Vec::with_capacity(operands.len() * 8),
178    };
179
180    for operand in operands.iter() {
181        merge_for_screen_name(&mut new_val, operand);
182    }
183
184    Some(new_val)
185}
186
187fn merge_for_screen_name(a: &mut Vec<u8>, b: &[u8]) {
188    let original_len = a.len();
189    let mut i = 0;
190
191    while i < b.len() {
192        let bytes: [u8; 8] = match b[i..i + 8].try_into() {
193            Ok(bytes) => bytes,
194            Err(error) => {
195                log::error!("{}", error);
196                return;
197            }
198        };
199        let next_b = u64::from_be_bytes(bytes);
200
201        let mut found = false;
202        let mut j = 0;
203
204        while !found && j < original_len {
205            let bytes = match a[j..j + 8].try_into() {
206                Ok(bytes) => bytes,
207                Err(error) => {
208                    log::error!("{}", error);
209                    return;
210                }
211            };
212            let next_a = u64::from_be_bytes(bytes);
213            found = next_a == next_b;
214            j += 8;
215        }
216
217        if !found {
218            a.extend_from_slice(&b[i..i + 8]);
219        }
220        i += 8;
221    }
222}
223
224fn screen_name_to_key(screen_name: &str) -> Vec<u8> {
225    let form = screen_name.to_lowercase();
226    form.as_bytes().to_vec()
227}
228
229fn key_to_screen_name(key: &[u8]) -> Result<&str, Error> {
230    Ok(std::str::from_utf8(key)?)
231}
232
233fn value_to_ids(value: &[u8]) -> Result<Vec<u64>, Error> {
234    let mut result = Vec::with_capacity(value.len() / 8);
235    let mut i = 0;
236
237    while i < value.len() {
238        let id = u64::from_be_bytes(
239            value[i..i + 8]
240                .try_into()
241                .map_err(|_| Error::InvalidValue(value.to_vec()))?,
242        );
243        result.push(id);
244        i += 8;
245    }
246
247    Ok(result)
248}