Skip to main content

rust_ipfs/repo/store/datastore/
memory.rs

1use crate::error::Error;
2use crate::repo::{DataStore, PinKind, PinMode, PinModeRequirement, PinStore};
3use futures::StreamExt;
4use ipld_core::cid::{self, Cid};
5use std::path::PathBuf;
6use tokio::sync::{Mutex, OwnedMutexGuard};
7
8use std::collections::hash_map::Entry;
9
10// FIXME: Transition to Persistent Map to make iterating more consistent
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14
15/// Describes an in-memory `DataStore`.
16#[derive(Clone, Debug, Default)]
17pub struct MemDataStore {
18    inner: Arc<Mutex<HashMap<Vec<u8>, Vec<u8>>>>,
19    // this could also be PinDocument however doing any serialization allows to see the required
20    // error types easier
21    pin: Arc<Mutex<HashMap<Vec<u8>, Vec<u8>>>>,
22}
23
24impl MemDataStore {
25    pub fn new(_: PathBuf) -> Self {
26        Default::default()
27    }
28
29    /// Returns true if the pin document was changed, false otherwise.
30    fn insert_pin<'a>(
31        g: &mut OwnedMutexGuard<HashMap<Vec<u8>, Vec<u8>>>,
32        target: &'a Cid,
33        kind: &'a PinKind<&'_ Cid>,
34    ) -> Result<bool, Error> {
35        // rationale for storing as Cid: the same multihash can be pinned with different codecs.
36        // even if there aren't many polyglot documents known, pair of raw and the actual codec is
37        // always a possibility.
38        let key = target.to_bytes();
39
40        match g.entry(key) {
41            Entry::Occupied(mut oe) => {
42                let mut doc: PinDocument = serde_json::from_slice(oe.get())?;
43                if doc.update(true, kind)? {
44                    let vec = oe.get_mut();
45                    vec.clear();
46                    serde_json::to_writer(vec, &doc)?;
47                    trace!(doc = ?doc, kind = ?kind, "updated on insert");
48                    Ok(true)
49                } else {
50                    trace!(doc = ?doc, kind = ?kind, "update not needed on insert");
51                    Ok(false)
52                }
53            }
54            Entry::Vacant(ve) => {
55                let mut doc = PinDocument {
56                    version: 0,
57                    direct: false,
58                    recursive: Recursive::Not,
59                    cid_version: match target.version() {
60                        cid::Version::V0 => 0,
61                        cid::Version::V1 => 1,
62                    },
63                    indirect_by: Vec::new(),
64                };
65
66                doc.update(true, kind).unwrap();
67                let vec = serde_json::to_vec(&doc)?;
68                ve.insert(vec);
69                trace!(doc = ?doc, kind = ?kind, "created on insert");
70                Ok(true)
71            }
72        }
73    }
74
75    /// Returns true if the pin document was changed, false otherwise.
76    fn remove_pin<'a>(
77        g: &mut OwnedMutexGuard<HashMap<Vec<u8>, Vec<u8>>>,
78        target: &'a Cid,
79        kind: &'a PinKind<&'_ Cid>,
80    ) -> Result<bool, Error> {
81        // see cid vs. multihash from [`insert_direct_pin`]
82        let key = target.to_bytes();
83
84        match g.entry(key) {
85            Entry::Occupied(mut oe) => {
86                let mut doc: PinDocument = serde_json::from_slice(oe.get())?;
87                if !doc.update(false, kind)? {
88                    trace!(doc = ?doc, kind = ?kind, "update not needed on removal");
89                    return Ok(false);
90                }
91
92                if doc.can_remove() {
93                    oe.remove();
94                } else {
95                    let vec = oe.get_mut();
96                    vec.clear();
97                    serde_json::to_writer(vec, &doc)?;
98                }
99
100                Ok(true)
101            }
102            Entry::Vacant(_) => Err(anyhow::anyhow!("not pinned")),
103        }
104    }
105}
106
107impl PinStore for MemDataStore {
108    async fn is_pinned(&self, block: &Cid) -> Result<bool, Error> {
109        let key = block.to_bytes();
110
111        let g = self.pin.lock().await;
112
113        // the use of PinKind::RecursiveIntention necessitates the only return fast for
114        // only the known pins; we should somehow now query to see if there are any
115        // RecursiveIntention's. If there are any, we must walk the refs of each to see if the
116        // `block` is amongst of those recursive references which are not yet written to disk.
117        //
118        // doing this without holding a repo lock is not possible, so leaving this as partial
119        // implementation right now.
120        Ok(g.contains_key(&key))
121    }
122
123    async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error> {
124        let mut g = Mutex::lock_owned(Arc::clone(&self.pin)).await;
125        Self::insert_pin(&mut g, target, &PinKind::Direct)?;
126        Ok(())
127    }
128
129    async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error> {
130        let mut g = Mutex::lock_owned(Arc::clone(&self.pin)).await;
131        Self::remove_pin(&mut g, target, &PinKind::Direct)?;
132        Ok(())
133    }
134
135    async fn insert_recursive_pin(
136        &self,
137        target: &Cid,
138        mut refs: crate::repo::References<'_>,
139    ) -> Result<(), Error> {
140        use futures::stream::TryStreamExt;
141
142        let mut g = Mutex::lock_owned(Arc::clone(&self.pin)).await;
143
144        // this must fail if it is already fully pinned
145        Self::insert_pin(&mut g, target, &PinKind::RecursiveIntention)?;
146
147        let target_v1 = if target.version() == cid::Version::V1 {
148            target.to_owned()
149        } else {
150            // this is one more allocation
151            Cid::new_v1(target.codec(), target.hash().to_owned())
152        };
153
154        // collect these before even if they are many ... not sure if this is a good idea but, the
155        // inmem version doesn't need to be all that great. this could be for nothing, if the root
156        // was already pinned.
157
158        let mut count = 0;
159        let kind = PinKind::IndirectFrom(&target_v1);
160        while let Some(next) = refs.try_next().await? {
161            // no rollback, nothing
162            Self::insert_pin(&mut g, &next, &kind)?;
163            count += 1;
164        }
165
166        let kind = PinKind::Recursive(count as u64);
167        Self::insert_pin(&mut g, target, &kind)?;
168
169        Ok(())
170    }
171
172    async fn remove_recursive_pin(
173        &self,
174        target: &Cid,
175        mut refs: crate::repo::References<'_>,
176    ) -> Result<(), Error> {
177        use futures::TryStreamExt;
178
179        let mut g = Mutex::lock_owned(Arc::clone(&self.pin)).await;
180
181        let doc: PinDocument = match g.get(&target.to_bytes()) {
182            Some(raw) => match serde_json::from_slice(raw) {
183                Ok(doc) => doc,
184                Err(e) => return Err(e.into()),
185            },
186            // well we know it's not pinned at all but this is the general error message
187            None => return Err(anyhow::anyhow!("not pinned or pinned indirectly")),
188        };
189
190        let kind = match doc.pick_kind() {
191            Some(Ok(kind @ PinKind::Recursive(_)))
192            | Some(Ok(kind @ PinKind::RecursiveIntention)) => kind,
193            Some(Ok(PinKind::Direct)) => {
194                Self::remove_pin(&mut g, target, &PinKind::Direct)?;
195                return Ok(());
196            }
197            Some(Ok(PinKind::IndirectFrom(cid))) => {
198                return Err(anyhow::anyhow!("pinned indirectly through {}", cid));
199            }
200            // same here as above with the same message
201            _ => return Err(anyhow::anyhow!("not pinned or pinned indirectly")),
202        };
203
204        // this must fail if it is already fully pinned
205        Self::remove_pin(&mut g, target, &kind.as_ref())?;
206
207        let target_v1 = if target.version() == cid::Version::V1 {
208            target.to_owned()
209        } else {
210            // this is one more allocation
211            Cid::new_v1(target.codec(), target.hash().to_owned())
212        };
213
214        let kind = PinKind::IndirectFrom(&target_v1);
215        while let Some(next) = refs.try_next().await? {
216            // no rollback, nothing
217            Self::remove_pin(&mut g, &next, &kind)?;
218        }
219
220        Ok(())
221    }
222
223    async fn list(
224        &self,
225        requirement: Option<PinMode>,
226    ) -> futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>> {
227        use futures::stream::StreamExt;
228        use std::convert::TryFrom;
229        let g = self.pin.lock().await;
230
231        let requirement = PinModeRequirement::from(requirement);
232
233        let copy = g
234            .iter()
235            .map(|(key, value)| {
236                let cid = Cid::try_from(key.as_slice())?;
237                let doc: PinDocument = serde_json::from_slice(value)?;
238                let mode = doc.mode().ok_or_else(|| anyhow::anyhow!("invalid mode"))?;
239
240                Ok((cid, mode))
241            })
242            .filter(move |res| {
243                // could return just two different boxed streams
244                match res {
245                    Ok((_, mode)) => requirement.matches(mode),
246                    Err(_) => true,
247                }
248            })
249            .collect::<Vec<_>>();
250
251        futures::stream::iter(copy).boxed()
252    }
253
254    async fn query(
255        &self,
256        cids: Vec<Cid>,
257        requirement: Option<PinMode>,
258    ) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
259        let g = self.pin.lock().await;
260
261        let requirement = PinModeRequirement::from(requirement);
262
263        cids.into_iter()
264            .map(move |cid| {
265                match g.get(&cid.to_bytes()) {
266                    Some(raw) => {
267                        let doc: PinDocument = match serde_json::from_slice(raw) {
268                            Ok(doc) => doc,
269                            Err(e) => return Err(e.into()),
270                        };
271                        // None from document is bad result, since the document shouldn't exist in the
272                        // first place
273                        let mode = match doc.pick_kind() {
274                            Some(Ok(kind)) => kind,
275                            Some(Err(invalid_cid)) => return Err(Error::new(invalid_cid)),
276                            None => {
277                                trace!(doc = ?doc, "could not pick pin kind");
278                                return Err(anyhow::anyhow!("{} is not pinned", cid));
279                            }
280                        };
281
282                        // would be more clear if this business was in a separate map; quite awful
283                        // as it is now
284
285                        let matches = requirement.matches(&mode);
286
287                        if matches {
288                            trace!(cid = %cid, req = ?requirement, "pin matches");
289                            return Ok((cid, mode));
290                        } else {
291                            // FIXME: this error is about the same as http api expects
292                            return Err(anyhow::anyhow!(
293                                "{} is not pinned as {:?}",
294                                cid,
295                                requirement
296                                    .required()
297                                    .expect("matches is never false if requirement is none")
298                            ));
299                        }
300                    }
301                    None => {
302                        trace!(cid = %cid, "no record found");
303                    }
304                }
305
306                // FIXME: this error is expected on http interface
307                Err(anyhow::anyhow!("{} is not pinned", cid))
308            })
309            .collect::<Result<Vec<_>, _>>()
310    }
311}
312
313impl DataStore for MemDataStore {
314    async fn init(&self) -> Result<(), Error> {
315        Ok(())
316    }
317
318    async fn contains(&self, key: &[u8]) -> Result<bool, Error> {
319        let contains = self.inner.lock().await.contains_key(key);
320        Ok(contains)
321    }
322
323    async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
324        let value = self
325            .inner
326            .lock()
327            .await
328            .get(key)
329            .map(|value| value.to_owned());
330        Ok(value)
331    }
332
333    async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
334        self.inner
335            .lock()
336            .await
337            .insert(key.to_owned(), value.to_owned());
338        Ok(())
339    }
340
341    async fn remove(&self, key: &[u8]) -> Result<(), Error> {
342        self.inner.lock().await.remove(key);
343        Ok(())
344    }
345
346    async fn iter(&self) -> futures::stream::BoxStream<'static, (Vec<u8>, Vec<u8>)> {
347        let list = self.inner.lock().await.clone();
348
349        let stream = async_stream::stream! {
350            for (k, v) in list {
351                yield (k, v)
352            }
353        };
354
355        stream.boxed()
356    }
357}
358
359#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
360enum Recursive {
361    /// Persistent record of **completed** recursive pinning. All references now have indirect pins
362    /// recorded.
363    Count(u64),
364    /// Persistent record of intent to add recursive pins to all indirect blocks or even not to
365    /// keep the go-ipfs way which might not be a bad idea after all. Adding all the indirect pins
366    /// on disk will cause massive write amplification in the end, but lets keep that way until we
367    /// get everything working at least.
368    Intent,
369    /// Not pinned recursively.
370    Not,
371}
372
373impl Recursive {
374    fn is_set(&self) -> bool {
375        match self {
376            Recursive::Count(_) | Recursive::Intent => true,
377            Recursive::Not => false,
378        }
379    }
380}
381
382#[derive(Debug, Serialize, Deserialize)]
383struct PinDocument {
384    version: u8,
385    direct: bool,
386    // how many descendants; something to check when walking
387    recursive: Recursive,
388    // no further metadata necessary; cids are pinned by full cid
389    cid_version: u8,
390    // using the cidv1 versions of all cids here, not sure if that makes sense or is important
391    indirect_by: Vec<String>,
392}
393
394impl PinDocument {
395    fn update(&mut self, add: bool, kind: &PinKind<&'_ Cid>) -> Result<bool, PinUpdateError> {
396        // these update rules are a bit complex and there are cases we don't need to handle.
397        // Updating on upon `PinKind` forces the caller to inspect what the current state is for
398        // example to handle the case of failing "unpin currently recursively pinned as direct".
399        // the ruleset seems quite strange to be honest.
400        match kind {
401            PinKind::IndirectFrom(root) => {
402                let root = if root.version() == cid::Version::V1 {
403                    root.to_string()
404                } else {
405                    // this is one more allocation
406                    Cid::new_v1(root.codec(), (*root).hash().to_owned()).to_string()
407                };
408
409                let modified = if self.indirect_by.is_empty() {
410                    if add {
411                        self.indirect_by.push(root);
412                        true
413                    } else {
414                        false
415                    }
416                } else {
417                    let mut set = self
418                        .indirect_by
419                        .drain(..)
420                        .collect::<std::collections::BTreeSet<_>>();
421
422                    let modified = if add {
423                        set.insert(root)
424                    } else {
425                        set.remove(&root)
426                    };
427
428                    self.indirect_by.extend(set);
429                    modified
430                };
431
432                Ok(modified)
433            }
434            PinKind::Direct => {
435                if self.recursive.is_set() && !self.direct && add {
436                    // go-ipfs: cannot make recursive pin also direct
437                    // not really sure why does this rule exist; the other way around is allowed
438                    return Err(PinUpdateError::AlreadyPinnedRecursive);
439                }
440
441                if !add && !self.direct {
442                    return match !self.recursive.is_set() {
443                        true => Err(PinUpdateError::CannotUnpinUnpinned),
444                        false => Err(PinUpdateError::CannotUnpinDirectOnRecursivelyPinned),
445                    };
446                }
447
448                let modified = self.direct != add;
449                self.direct = add;
450                Ok(modified)
451            }
452            PinKind::RecursiveIntention => {
453                let modified = if add {
454                    match self.recursive {
455                        Recursive::Count(_) => return Err(PinUpdateError::AlreadyPinnedRecursive),
456                        // can overwrite Intent with another Intent, as Ipfs::insert_pin is now moving to fix
457                        // the Intent into the "final form" of Recursive::Count.
458                        Recursive::Intent => false,
459                        Recursive::Not => {
460                            self.recursive = Recursive::Intent;
461                            self.direct = false;
462                            true
463                        }
464                    }
465                } else {
466                    match self.recursive {
467                        Recursive::Count(_) | Recursive::Intent => {
468                            self.recursive = Recursive::Not;
469                            true
470                        }
471                        Recursive::Not => false,
472                    }
473                };
474
475                Ok(modified)
476            }
477            PinKind::Recursive(descendants) => {
478                let descendants = *descendants;
479                let modified = if add {
480                    match self.recursive {
481                        Recursive::Count(other) if other != descendants => {
482                            return Err(PinUpdateError::UnexpectedNumberOfDescendants(
483                                other,
484                                descendants,
485                            ));
486                        }
487                        Recursive::Count(_) => false,
488                        Recursive::Intent | Recursive::Not => {
489                            self.recursive = Recursive::Count(descendants);
490                            // the previously direct has now been upgraded to recursive, it can
491                            // still be indirect though
492                            self.direct = false;
493                            true
494                        }
495                    }
496                } else {
497                    match self.recursive {
498                        Recursive::Count(other) if other != descendants => {
499                            return Err(PinUpdateError::UnexpectedNumberOfDescendants(
500                                other,
501                                descendants,
502                            ));
503                        }
504                        Recursive::Count(_) | Recursive::Intent => {
505                            self.recursive = Recursive::Not;
506                            true
507                        }
508                        Recursive::Not => return Err(PinUpdateError::NotPinnedRecursive),
509                    }
510                    // FIXME: removing ... not sure if this is an issue; was thinking that maybe
511                    // the update might need to be split to allow different api for removal than
512                    // addition.
513                };
514                Ok(modified)
515            }
516        }
517    }
518
519    fn can_remove(&self) -> bool {
520        !self.direct && !self.recursive.is_set() && self.indirect_by.is_empty()
521    }
522
523    fn mode(&self) -> Option<PinMode> {
524        if self.recursive.is_set() {
525            Some(PinMode::Recursive)
526        } else if !self.indirect_by.is_empty() {
527            Some(PinMode::Indirect)
528        } else if self.direct {
529            Some(PinMode::Direct)
530        } else {
531            None
532        }
533    }
534
535    fn pick_kind(&self) -> Option<Result<PinKind<Cid>, cid::Error>> {
536        self.mode().map(|p| {
537            Ok(match p {
538                PinMode::Recursive => match self.recursive {
539                    Recursive::Intent => PinKind::RecursiveIntention,
540                    Recursive::Count(total) => PinKind::Recursive(total),
541                    _ => unreachable!("mode should not have returned PinKind::Recursive"),
542                },
543                PinMode::Indirect => {
544                    // go-ipfs does seem to be doing a fifo looking, perhaps this is a list there, or
545                    // the indirect pins aren't being written down anywhere and they just refs from
546                    // recursive roots.
547                    let cid = Cid::try_from(self.indirect_by[0].as_str())?;
548                    PinKind::IndirectFrom(cid)
549                }
550                PinMode::Direct => PinKind::Direct,
551            })
552        })
553    }
554}
555
556/// Describes the error variants for updates to object pinning.
557#[derive(Debug, thiserror::Error)]
558pub enum PinUpdateError {
559    /// The current and expected descendants of an already recursively pinned object don't match.
560    #[error("unexpected number of descendants ({}), found {}", .1, .0)]
561    UnexpectedNumberOfDescendants(u64, u64),
562    /// Recursive update fails as it wasn't pinned recursively.
563    #[error("not pinned recursively")]
564    NotPinnedRecursive,
565    /// Not allowed: Adding direct pin while pinned recursive.
566    #[error("already pinned recursively")]
567    AlreadyPinnedRecursive,
568    /// Can't unpin already inpinned.
569    #[error("not pinned or pinned indirectly")]
570    CannotUnpinUnpinned,
571    // go-ipfs prepends the ipfspath here
572    /// Can't unpin direct on a recursively pinned object.
573    #[error("is pinned recursively")]
574    CannotUnpinDirectOnRecursivelyPinned,
575}
576
577#[cfg(test)]
578crate::pinstore_interface_tests!(
579    common_tests,
580    crate::repo::datastore::memory::MemDataStore::new
581);
582
583#[cfg(test)]
584mod tests {
585    use super::*;
586
587    #[tokio::test]
588    async fn test_mem_datastore() {
589        let tmp = std::env::temp_dir();
590        let store = MemDataStore::new(tmp);
591        let key = [1, 2, 3, 4];
592        let value = [5, 6, 7, 8];
593
594        store.init().await.unwrap();
595
596        let contains = store.contains(&key);
597        assert!(!contains.await.unwrap());
598        let get = store.get(&key);
599        assert_eq!(get.await.unwrap(), None);
600        store.remove(&key).await.unwrap();
601
602        let put = store.put(&key, &value);
603        put.await.unwrap();
604        let contains = store.contains(&key);
605        assert!(contains.await.unwrap());
606        let get = store.get(&key);
607        assert_eq!(get.await.unwrap(), Some(value.to_vec()));
608
609        store.remove(&key).await.unwrap();
610        let contains = store.contains(&key);
611        assert!(!contains.await.unwrap());
612        let get = store.get(&key);
613        assert_eq!(get.await.unwrap(), None);
614    }
615
616    #[test]
617    fn pindocument_on_direct_pin() {
618        let mut doc = PinDocument {
619            version: 0,
620            direct: false,
621            recursive: Recursive::Not,
622            cid_version: 0,
623            indirect_by: Vec::new(),
624        };
625
626        assert!(doc.update(true, &PinKind::Direct).unwrap());
627
628        assert_eq!(doc.mode(), Some(PinMode::Direct));
629        assert_eq!(doc.pick_kind().unwrap().unwrap(), PinKind::Direct);
630    }
631}