libp2p_kad/record/store/
memory.rs1use super::*;
22
23use crate::kbucket;
24use libp2p_core::PeerId;
25use smallvec::SmallVec;
26use std::borrow::Cow;
27use std::collections::{hash_map, hash_set, HashMap, HashSet};
28use std::iter;
29
30pub struct MemoryStore {
32 local_key: kbucket::Key<PeerId>,
34 config: MemoryStoreConfig,
36 records: HashMap<Key, Record>,
38 providers: HashMap<Key, SmallVec<[ProviderRecord; K_VALUE.get()]>>,
40 provided: HashSet<ProviderRecord>,
44}
45
46pub struct MemoryStoreConfig {
48 pub max_records: usize,
50 pub max_value_bytes: usize,
52 pub max_providers_per_key: usize,
56 pub max_provided_keys: usize,
59}
60
61impl Default for MemoryStoreConfig {
62 fn default() -> Self {
63 Self {
64 max_records: 1024,
65 max_value_bytes: 65 * 1024,
66 max_provided_keys: 1024,
67 max_providers_per_key: K_VALUE.get(),
68 }
69 }
70}
71
72impl MemoryStore {
73 pub fn new(local_id: PeerId) -> Self {
75 Self::with_config(local_id, Default::default())
76 }
77
78 pub fn with_config(local_id: PeerId, config: MemoryStoreConfig) -> Self {
80 MemoryStore {
81 local_key: kbucket::Key::from(local_id),
82 config,
83 records: HashMap::default(),
84 provided: HashSet::default(),
85 providers: HashMap::default(),
86 }
87 }
88
89 pub fn retain<F>(&mut self, f: F)
91 where
92 F: FnMut(&Key, &mut Record) -> bool
93 {
94 self.records.retain(f);
95 }
96}
97
98impl<'a> RecordStore<'a> for MemoryStore {
99 type RecordsIter = iter::Map<
100 hash_map::Values<'a, Key, Record>,
101 fn(&'a Record) -> Cow<'a, Record>
102 >;
103
104 type ProvidedIter = iter::Map<
105 hash_set::Iter<'a, ProviderRecord>,
106 fn(&'a ProviderRecord) -> Cow<'a, ProviderRecord>
107 >;
108
109 fn get(&'a self, k: &Key) -> Option<Cow<'_, Record>> {
110 self.records.get(k).map(Cow::Borrowed)
111 }
112
113 fn put(&'a mut self, r: Record) -> Result<()> {
114 if r.value.len() >= self.config.max_value_bytes {
115 return Err(Error::ValueTooLarge)
116 }
117
118 let num_records = self.records.len();
119
120 match self.records.entry(r.key.clone()) {
121 hash_map::Entry::Occupied(mut e) => {
122 e.insert(r);
123 }
124 hash_map::Entry::Vacant(e) => {
125 if num_records >= self.config.max_records {
126 return Err(Error::MaxRecords)
127 }
128 e.insert(r);
129 }
130 }
131
132 Ok(())
133 }
134
135 fn remove(&'a mut self, k: &Key) {
136 self.records.remove(k);
137 }
138
139 fn records(&'a self) -> Self::RecordsIter {
140 self.records.values().map(Cow::Borrowed)
141 }
142
143 fn add_provider(&'a mut self, record: ProviderRecord) -> Result<()> {
144 let num_keys = self.providers.len();
145
146 let providers = match self.providers.entry(record.key.clone()) {
148 e@hash_map::Entry::Occupied(_) => e,
149 e@hash_map::Entry::Vacant(_) => {
150 if self.config.max_provided_keys == num_keys {
151 return Err(Error::MaxProvidedKeys)
152 }
153 e
154 }
155 }.or_insert_with(Default::default);
156
157 if let Some(i) = providers.iter().position(|p| p.provider == record.provider) {
158 providers.as_mut()[i] = record;
160 } else {
161 let local_key = self.local_key.clone();
163 let key = kbucket::Key::new(record.key.clone());
164 let provider = kbucket::Key::from(record.provider);
165 if let Some(i) = providers.iter().position(|p| {
166 let pk = kbucket::Key::from(p.provider);
167 provider.distance(&key) < pk.distance(&key)
168 }) {
169 if local_key.preimage() == &record.provider {
171 self.provided.insert(record.clone());
172 }
173 providers.insert(i, record);
174 if providers.len() > self.config.max_providers_per_key {
176 if let Some(p) = providers.pop() {
177 self.provided.remove(&p);
178 }
179 }
180 }
181 else if providers.len() < self.config.max_providers_per_key {
182 if local_key.preimage() == &record.provider {
185 self.provided.insert(record.clone());
186 }
187 providers.push(record);
188 }
189 }
190 Ok(())
191 }
192
193 fn providers(&'a self, key: &Key) -> Vec<ProviderRecord> {
194 self.providers.get(key).map_or_else(Vec::new, |ps| ps.clone().into_vec())
195 }
196
197 fn provided(&'a self) -> Self::ProvidedIter {
198 self.provided.iter().map(Cow::Borrowed)
199 }
200
201 fn remove_provider(&'a mut self, key: &Key, provider: &PeerId) {
202 if let hash_map::Entry::Occupied(mut e) = self.providers.entry(key.clone()) {
203 let providers = e.get_mut();
204 if let Some(i) = providers.iter().position(|p| &p.provider == provider) {
205 let p = providers.remove(i);
206 self.provided.remove(&p);
207 }
208 if providers.is_empty() {
209 e.remove();
210 }
211 }
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218 use libp2p_core::multihash::{Code, Multihash};
219 use quickcheck::*;
220 use rand::Rng;
221
222 fn random_multihash() -> Multihash {
223 Multihash::wrap(Code::Sha2_256.into(), &rand::thread_rng().gen::<[u8; 32]>()).unwrap()
224 }
225
226 fn distance(r: &ProviderRecord) -> kbucket::Distance {
227 kbucket::Key::new(r.key.clone())
228 .distance(&kbucket::Key::from(r.provider))
229 }
230
231 #[test]
232 fn put_get_remove_record() {
233 fn prop(r: Record) {
234 let mut store = MemoryStore::new(PeerId::random());
235 assert!(store.put(r.clone()).is_ok());
236 assert_eq!(Some(Cow::Borrowed(&r)), store.get(&r.key));
237 store.remove(&r.key);
238 assert!(store.get(&r.key).is_none());
239 }
240 quickcheck(prop as fn(_))
241 }
242
243 #[test]
244 fn add_get_remove_provider() {
245 fn prop(r: ProviderRecord) {
246 let mut store = MemoryStore::new(PeerId::random());
247 assert!(store.add_provider(r.clone()).is_ok());
248 assert!(store.providers(&r.key).contains(&r));
249 store.remove_provider(&r.key, &r.provider);
250 assert!(!store.providers(&r.key).contains(&r));
251 }
252 quickcheck(prop as fn(_))
253 }
254
255 #[test]
256 fn providers_ordered_by_distance_to_key() {
257 fn prop(providers: Vec<kbucket::Key<PeerId>>) -> bool {
258 let mut store = MemoryStore::new(PeerId::random());
259 let key = Key::from(random_multihash());
260
261 let mut records = providers.into_iter().map(|p| {
262 ProviderRecord::new(key.clone(), p.into_preimage(), Vec::new())
263 }).collect::<Vec<_>>();
264
265 for r in &records {
266 assert!(store.add_provider(r.clone()).is_ok());
267 }
268
269 records.sort_by(|r1, r2| distance(r1).cmp(&distance(r2)));
270 records.truncate(store.config.max_providers_per_key);
271
272 records == store.providers(&key).to_vec()
273 }
274
275 quickcheck(prop as fn(_) -> _)
276 }
277
278 #[test]
279 fn provided() {
280 let id = PeerId::random();
281 let mut store = MemoryStore::new(id.clone());
282 let key = random_multihash();
283 let rec = ProviderRecord::new(key, id.clone(), Vec::new());
284 assert!(store.add_provider(rec.clone()).is_ok());
285 assert_eq!(vec![Cow::Borrowed(&rec)], store.provided().collect::<Vec<_>>());
286 store.remove_provider(&rec.key, &id);
287 assert_eq!(store.provided().count(), 0);
288 }
289
290 #[test]
291 fn update_provider() {
292 let mut store = MemoryStore::new(PeerId::random());
293 let key = random_multihash();
294 let prv = PeerId::random();
295 let mut rec = ProviderRecord::new(key, prv, Vec::new());
296 assert!(store.add_provider(rec.clone()).is_ok());
297 assert_eq!(vec![rec.clone()], store.providers(&rec.key).to_vec());
298 rec.expires = Some(Instant::now());
299 assert!(store.add_provider(rec.clone()).is_ok());
300 assert_eq!(vec![rec.clone()], store.providers(&rec.key).to_vec());
301 }
302
303 #[test]
304 fn max_provided_keys() {
305 let mut store = MemoryStore::new(PeerId::random());
306 for _ in 0 .. store.config.max_provided_keys {
307 let key = random_multihash();
308 let prv = PeerId::random();
309 let rec = ProviderRecord::new(key, prv, Vec::new());
310 let _ = store.add_provider(rec);
311 }
312 let key = random_multihash();
313 let prv = PeerId::random();
314 let rec = ProviderRecord::new(key, prv, Vec::new());
315 match store.add_provider(rec) {
316 Err(Error::MaxProvidedKeys) => {}
317 _ => panic!("Unexpected result"),
318 }
319 }
320}