libp2p_kad/record/store/
memory.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use super::*;
22
23use crate::kbucket;
24use libp2p_core::PeerId;
25use smallvec::SmallVec;
26use std::borrow::Cow;
27use std::collections::{hash_map, hash_set, HashMap, HashSet};
28use std::iter;
29
30/// In-memory implementation of a `RecordStore`.
31pub struct MemoryStore {
32    /// The identity of the peer owning the store.
33    local_key: kbucket::Key<PeerId>,
34    /// The configuration of the store.
35    config: MemoryStoreConfig,
36    /// The stored (regular) records.
37    records: HashMap<Key, Record>,
38    /// The stored provider records.
39    providers: HashMap<Key, SmallVec<[ProviderRecord; K_VALUE.get()]>>,
40    /// The set of all provider records for the node identified by `local_key`.
41    ///
42    /// Must be kept in sync with `providers`.
43    provided: HashSet<ProviderRecord>,
44}
45
46/// Configuration for a `MemoryStore`.
47pub struct MemoryStoreConfig {
48    /// The maximum number of records.
49    pub max_records: usize,
50    /// The maximum size of record values, in bytes.
51    pub max_value_bytes: usize,
52    /// The maximum number of providers stored for a key.
53    ///
54    /// This should match up with the chosen replication factor.
55    pub max_providers_per_key: usize,
56    /// The maximum number of provider records for which the
57    /// local node is the provider.
58    pub max_provided_keys: usize,
59}
60
61impl Default for MemoryStoreConfig {
62    fn default() -> Self {
63        Self {
64            max_records: 1024,
65            max_value_bytes: 65 * 1024,
66            max_provided_keys: 1024,
67            max_providers_per_key: K_VALUE.get(),
68        }
69    }
70}
71
72impl MemoryStore {
73    /// Creates a new `MemoryRecordStore` with a default configuration.
74    pub fn new(local_id: PeerId) -> Self {
75        Self::with_config(local_id, Default::default())
76    }
77
78    /// Creates a new `MemoryRecordStore` with the given configuration.
79    pub fn with_config(local_id: PeerId, config: MemoryStoreConfig) -> Self {
80        MemoryStore {
81            local_key: kbucket::Key::from(local_id),
82            config,
83            records: HashMap::default(),
84            provided: HashSet::default(),
85            providers: HashMap::default(),
86        }
87    }
88
89    /// Retains the records satisfying a predicate.
90    pub fn retain<F>(&mut self, f: F)
91    where
92        F: FnMut(&Key, &mut Record) -> bool
93    {
94        self.records.retain(f);
95    }
96}
97
98impl<'a> RecordStore<'a> for MemoryStore {
99    type RecordsIter = iter::Map<
100        hash_map::Values<'a, Key, Record>,
101        fn(&'a Record) -> Cow<'a, Record>
102    >;
103
104    type ProvidedIter = iter::Map<
105        hash_set::Iter<'a, ProviderRecord>,
106        fn(&'a ProviderRecord) -> Cow<'a, ProviderRecord>
107    >;
108
109    fn get(&'a self, k: &Key) -> Option<Cow<'_, Record>> {
110        self.records.get(k).map(Cow::Borrowed)
111    }
112
113    fn put(&'a mut self, r: Record) -> Result<()> {
114        if r.value.len() >= self.config.max_value_bytes {
115            return Err(Error::ValueTooLarge)
116        }
117
118        let num_records = self.records.len();
119
120        match self.records.entry(r.key.clone()) {
121            hash_map::Entry::Occupied(mut e) => {
122                e.insert(r);
123            }
124            hash_map::Entry::Vacant(e) => {
125                if num_records >= self.config.max_records {
126                    return Err(Error::MaxRecords)
127                }
128                e.insert(r);
129            }
130        }
131
132        Ok(())
133    }
134
135    fn remove(&'a mut self, k: &Key) {
136        self.records.remove(k);
137    }
138
139    fn records(&'a self) -> Self::RecordsIter {
140        self.records.values().map(Cow::Borrowed)
141    }
142
143    fn add_provider(&'a mut self, record: ProviderRecord) -> Result<()> {
144        let num_keys = self.providers.len();
145
146        // Obtain the entry
147        let providers = match self.providers.entry(record.key.clone()) {
148            e@hash_map::Entry::Occupied(_) => e,
149            e@hash_map::Entry::Vacant(_) => {
150                if self.config.max_provided_keys == num_keys {
151                    return Err(Error::MaxProvidedKeys)
152                }
153                e
154            }
155        }.or_insert_with(Default::default);
156
157        if let Some(i) = providers.iter().position(|p| p.provider == record.provider) {
158            // In-place update of an existing provider record.
159            providers.as_mut()[i] = record;
160        } else {
161            // It is a new provider record for that key.
162            let local_key = self.local_key.clone();
163            let key = kbucket::Key::new(record.key.clone());
164            let provider = kbucket::Key::from(record.provider);
165            if let Some(i) = providers.iter().position(|p| {
166                let pk = kbucket::Key::from(p.provider);
167                provider.distance(&key) < pk.distance(&key)
168            }) {
169                // Insert the new provider.
170                if local_key.preimage() == &record.provider {
171                    self.provided.insert(record.clone());
172                }
173                providers.insert(i, record);
174                // Remove the excess provider, if any.
175                if providers.len() > self.config.max_providers_per_key {
176                    if let Some(p) = providers.pop() {
177                        self.provided.remove(&p);
178                    }
179                }
180            }
181            else if providers.len() < self.config.max_providers_per_key {
182                // The distance of the new provider to the key is larger than
183                // the distance of any existing provider, but there is still room.
184                if local_key.preimage() == &record.provider {
185                    self.provided.insert(record.clone());
186                }
187                providers.push(record);
188            }
189        }
190        Ok(())
191    }
192
193    fn providers(&'a self, key: &Key) -> Vec<ProviderRecord> {
194        self.providers.get(key).map_or_else(Vec::new, |ps| ps.clone().into_vec())
195    }
196
197    fn provided(&'a self) -> Self::ProvidedIter {
198        self.provided.iter().map(Cow::Borrowed)
199    }
200
201    fn remove_provider(&'a mut self, key: &Key, provider: &PeerId) {
202        if let hash_map::Entry::Occupied(mut e) = self.providers.entry(key.clone()) {
203            let providers = e.get_mut();
204            if let Some(i) = providers.iter().position(|p| &p.provider == provider) {
205                let p = providers.remove(i);
206                self.provided.remove(&p);
207            }
208            if providers.is_empty() {
209                e.remove();
210            }
211        }
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use libp2p_core::multihash::{Code, Multihash};
219    use quickcheck::*;
220    use rand::Rng;
221
222    fn random_multihash() -> Multihash {
223        Multihash::wrap(Code::Sha2_256.into(), &rand::thread_rng().gen::<[u8; 32]>()).unwrap()
224    }
225
226    fn distance(r: &ProviderRecord) -> kbucket::Distance {
227        kbucket::Key::new(r.key.clone())
228            .distance(&kbucket::Key::from(r.provider))
229    }
230
231    #[test]
232    fn put_get_remove_record() {
233        fn prop(r: Record) {
234            let mut store = MemoryStore::new(PeerId::random());
235            assert!(store.put(r.clone()).is_ok());
236            assert_eq!(Some(Cow::Borrowed(&r)), store.get(&r.key));
237            store.remove(&r.key);
238            assert!(store.get(&r.key).is_none());
239        }
240        quickcheck(prop as fn(_))
241    }
242
243    #[test]
244    fn add_get_remove_provider() {
245        fn prop(r: ProviderRecord) {
246            let mut store = MemoryStore::new(PeerId::random());
247            assert!(store.add_provider(r.clone()).is_ok());
248            assert!(store.providers(&r.key).contains(&r));
249            store.remove_provider(&r.key, &r.provider);
250            assert!(!store.providers(&r.key).contains(&r));
251        }
252        quickcheck(prop as fn(_))
253    }
254
255    #[test]
256    fn providers_ordered_by_distance_to_key() {
257        fn prop(providers: Vec<kbucket::Key<PeerId>>) -> bool {
258            let mut store = MemoryStore::new(PeerId::random());
259            let key = Key::from(random_multihash());
260
261            let mut records = providers.into_iter().map(|p| {
262                ProviderRecord::new(key.clone(), p.into_preimage(), Vec::new())
263            }).collect::<Vec<_>>();
264
265            for r in &records {
266                assert!(store.add_provider(r.clone()).is_ok());
267            }
268
269            records.sort_by(|r1, r2| distance(r1).cmp(&distance(r2)));
270            records.truncate(store.config.max_providers_per_key);
271
272            records == store.providers(&key).to_vec()
273        }
274
275        quickcheck(prop as fn(_) -> _)
276    }
277
278    #[test]
279    fn provided() {
280        let id = PeerId::random();
281        let mut store = MemoryStore::new(id.clone());
282        let key = random_multihash();
283        let rec = ProviderRecord::new(key, id.clone(), Vec::new());
284        assert!(store.add_provider(rec.clone()).is_ok());
285        assert_eq!(vec![Cow::Borrowed(&rec)], store.provided().collect::<Vec<_>>());
286        store.remove_provider(&rec.key, &id);
287        assert_eq!(store.provided().count(), 0);
288    }
289
290    #[test]
291    fn update_provider() {
292        let mut store = MemoryStore::new(PeerId::random());
293        let key = random_multihash();
294        let prv = PeerId::random();
295        let mut rec = ProviderRecord::new(key, prv, Vec::new());
296        assert!(store.add_provider(rec.clone()).is_ok());
297        assert_eq!(vec![rec.clone()], store.providers(&rec.key).to_vec());
298        rec.expires = Some(Instant::now());
299        assert!(store.add_provider(rec.clone()).is_ok());
300        assert_eq!(vec![rec.clone()], store.providers(&rec.key).to_vec());
301    }
302
303    #[test]
304    fn max_provided_keys() {
305        let mut store = MemoryStore::new(PeerId::random());
306        for _ in 0 .. store.config.max_provided_keys {
307            let key = random_multihash();
308            let prv = PeerId::random();
309            let rec = ProviderRecord::new(key, prv, Vec::new());
310            let _ = store.add_provider(rec);
311        }
312        let key = random_multihash();
313        let prv = PeerId::random();
314        let rec = ProviderRecord::new(key, prv, Vec::new());
315        match store.add_provider(rec) {
316            Err(Error::MaxProvidedKeys) => {}
317            _ => panic!("Unexpected result"),
318        }
319    }
320}