memory_lol/db/
screen_names.rs1use super::{
2 accounts::AccountTable,
3 table::{Mode, Table, Writeable},
4 Error,
5};
6use rocksdb::{IteratorMode, MergeOperands, Options, DB};
7use std::convert::TryInto;
8use std::marker::PhantomData;
9use std::path::Path;
10
11#[derive(Clone, Debug, Eq, PartialEq)]
12pub struct ScreenNameTableCounts {
13 pub screen_name_count: u64,
14 pub mapping_count: u64,
15}
16
17pub struct ScreenNameTable<M> {
18 db: Option<DB>,
19 mode: PhantomData<M>,
20}
21
22impl<M> Table for ScreenNameTable<M> {
23 type Counts = ScreenNameTableCounts;
24
25 fn underlying(&self) -> &DB {
26 self.db.as_ref().unwrap()
27 }
28
29 fn get_counts(&self) -> Result<Self::Counts, Error> {
30 let mut screen_name_count = 0;
31 let mut mapping_count = 0;
32
33 let iter = self.db.as_ref().unwrap().iterator(IteratorMode::Start);
34
35 for result in iter {
36 let (_, value) = result?;
37
38 screen_name_count += 1;
39 let value_len = value.len();
40
41 if value_len % 8 == 0 {
42 mapping_count += value_len as u64 / 8;
43 } else {
44 return Err(Error::InvalidValue(value.to_vec()));
45 }
46 }
47
48 Ok(Self::Counts {
49 screen_name_count,
50 mapping_count,
51 })
52 }
53}
54
55impl<M> ScreenNameTable<M> {
56 fn make_options() -> Options {
57 let mut options = Options::default();
58 options.create_if_missing(true);
59 options.set_merge_operator_associative("merge", merge);
60 options
61 }
62
63 pub fn lookup(&self, screen_name: &str) -> Result<Vec<u64>, Error> {
64 let value = self
65 .db
66 .as_ref()
67 .unwrap()
68 .get_pinned(screen_name_to_key(screen_name))?;
69 value
70 .as_ref()
71 .map(|value| value_to_ids(value))
72 .unwrap_or_else(|| Ok(vec![]))
73 }
74
75 pub fn lookup_by_prefix(
76 &self,
77 screen_name: &str,
78 limit: usize,
79 ) -> Result<Vec<(String, Vec<u64>)>, Error> {
80 let prefix = screen_name_to_key(screen_name);
81 let iter = self.db.as_ref().unwrap().prefix_iterator(&prefix);
82 let mut results = Vec::with_capacity(1);
83
84 for result in iter.take(limit) {
85 let (key, value) = result?;
86
87 if key.starts_with(&prefix) {
88 let screen_name = key_to_screen_name(&key)?;
89 let ids = value_to_ids(&value)?;
90
91 results.push((screen_name.to_string(), ids));
92 } else {
93 break;
94 }
95 }
96
97 Ok(results)
98 }
99
100 pub fn get_most_reused(&self, k: usize) -> Result<Vec<(String, Vec<u64>)>, Error> {
101 let mut queue = priority_queue::DoublePriorityQueue::with_capacity(k);
102 let iter = self.db.as_ref().unwrap().iterator(IteratorMode::Start);
103
104 for result in iter {
105 let (key, value) = result?;
106 let screen_name = key_to_screen_name(&key)?;
107 let ids = value_to_ids(&value)?;
108
109 let min = queue.peek_min().map(|(_, count)| *count).unwrap_or(0);
110 let len = ids.len();
111
112 if len >= min || queue.len() < k {
113 queue.push((screen_name.to_string(), ids), len);
114
115 if queue.len() > k {
116 queue.pop_min();
117 }
118 }
119 }
120
121 Ok(queue.into_descending_sorted_vec())
122 }
123}
124
125impl<M: Mode> ScreenNameTable<M> {
126 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
127 let options = Self::make_options();
128 let db = if M::is_read_only() {
129 DB::open_for_read_only(&options, path, true)?
130 } else {
131 DB::open(&options, path)?
132 };
133
134 Ok(Self {
135 db: Some(db),
136 mode: PhantomData,
137 })
138 }
139}
140
141impl ScreenNameTable<Writeable> {
142 pub fn insert(&self, screen_name: &str, id: u64) -> Result<(), Error> {
143 Ok(self
144 .db
145 .as_ref()
146 .unwrap()
147 .merge(screen_name_to_key(screen_name), id.to_be_bytes())?)
148 }
149
150 pub fn rebuild<Mode>(&mut self, accounts: &AccountTable<Mode>) -> Result<(), Error> {
151 let path = self.db.as_ref().unwrap().path().to_path_buf();
152 self.db.take().unwrap();
153
154 let options = Self::make_options();
155
156 DB::destroy(&options, &path)?;
157
158 self.db = Some(DB::open(&options, &path)?);
159
160 for pair in accounts.pairs() {
161 let (id, screen_name, _) = pair?;
162
163 self.insert(&screen_name, id)?;
164 }
165
166 Ok(())
167 }
168}
169
170fn merge(
171 _new_key: &[u8],
172 existing_val: Option<&[u8]>,
173 operands: &MergeOperands,
174) -> Option<Vec<u8>> {
175 let mut new_val = match existing_val {
176 Some(bytes) => bytes.to_vec(),
177 None => Vec::with_capacity(operands.len() * 8),
178 };
179
180 for operand in operands.iter() {
181 merge_for_screen_name(&mut new_val, operand);
182 }
183
184 Some(new_val)
185}
186
187fn merge_for_screen_name(a: &mut Vec<u8>, b: &[u8]) {
188 let original_len = a.len();
189 let mut i = 0;
190
191 while i < b.len() {
192 let bytes: [u8; 8] = match b[i..i + 8].try_into() {
193 Ok(bytes) => bytes,
194 Err(error) => {
195 log::error!("{}", error);
196 return;
197 }
198 };
199 let next_b = u64::from_be_bytes(bytes);
200
201 let mut found = false;
202 let mut j = 0;
203
204 while !found && j < original_len {
205 let bytes = match a[j..j + 8].try_into() {
206 Ok(bytes) => bytes,
207 Err(error) => {
208 log::error!("{}", error);
209 return;
210 }
211 };
212 let next_a = u64::from_be_bytes(bytes);
213 found = next_a == next_b;
214 j += 8;
215 }
216
217 if !found {
218 a.extend_from_slice(&b[i..i + 8]);
219 }
220 i += 8;
221 }
222}
223
224fn screen_name_to_key(screen_name: &str) -> Vec<u8> {
225 let form = screen_name.to_lowercase();
226 form.as_bytes().to_vec()
227}
228
229fn key_to_screen_name(key: &[u8]) -> Result<&str, Error> {
230 Ok(std::str::from_utf8(key)?)
231}
232
233fn value_to_ids(value: &[u8]) -> Result<Vec<u64>, Error> {
234 let mut result = Vec::with_capacity(value.len() / 8);
235 let mut i = 0;
236
237 while i < value.len() {
238 let id = u64::from_be_bytes(
239 value[i..i + 8]
240 .try_into()
241 .map_err(|_| Error::InvalidValue(value.to_vec()))?,
242 );
243 result.push(id);
244 i += 8;
245 }
246
247 Ok(result)
248}