1use crate::error::Error;
2use crate::repo::{DataStore, PinKind, PinMode, PinModeRequirement, PinStore};
3use futures::StreamExt;
4use ipld_core::cid::{self, Cid};
5use std::path::PathBuf;
6use tokio::sync::{Mutex, OwnedMutexGuard};
7
8use std::collections::hash_map::Entry;
9
10use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14
15#[derive(Clone, Debug, Default)]
17pub struct MemDataStore {
18 inner: Arc<Mutex<HashMap<Vec<u8>, Vec<u8>>>>,
19 pin: Arc<Mutex<HashMap<Vec<u8>, Vec<u8>>>>,
22}
23
24impl MemDataStore {
25 pub fn new(_: PathBuf) -> Self {
26 Default::default()
27 }
28
29 fn insert_pin<'a>(
31 g: &mut OwnedMutexGuard<HashMap<Vec<u8>, Vec<u8>>>,
32 target: &'a Cid,
33 kind: &'a PinKind<&'_ Cid>,
34 ) -> Result<bool, Error> {
35 let key = target.to_bytes();
39
40 match g.entry(key) {
41 Entry::Occupied(mut oe) => {
42 let mut doc: PinDocument = serde_json::from_slice(oe.get())?;
43 if doc.update(true, kind)? {
44 let vec = oe.get_mut();
45 vec.clear();
46 serde_json::to_writer(vec, &doc)?;
47 trace!(doc = ?doc, kind = ?kind, "updated on insert");
48 Ok(true)
49 } else {
50 trace!(doc = ?doc, kind = ?kind, "update not needed on insert");
51 Ok(false)
52 }
53 }
54 Entry::Vacant(ve) => {
55 let mut doc = PinDocument {
56 version: 0,
57 direct: false,
58 recursive: Recursive::Not,
59 cid_version: match target.version() {
60 cid::Version::V0 => 0,
61 cid::Version::V1 => 1,
62 },
63 indirect_by: Vec::new(),
64 };
65
66 doc.update(true, kind).unwrap();
67 let vec = serde_json::to_vec(&doc)?;
68 ve.insert(vec);
69 trace!(doc = ?doc, kind = ?kind, "created on insert");
70 Ok(true)
71 }
72 }
73 }
74
75 fn remove_pin<'a>(
77 g: &mut OwnedMutexGuard<HashMap<Vec<u8>, Vec<u8>>>,
78 target: &'a Cid,
79 kind: &'a PinKind<&'_ Cid>,
80 ) -> Result<bool, Error> {
81 let key = target.to_bytes();
83
84 match g.entry(key) {
85 Entry::Occupied(mut oe) => {
86 let mut doc: PinDocument = serde_json::from_slice(oe.get())?;
87 if !doc.update(false, kind)? {
88 trace!(doc = ?doc, kind = ?kind, "update not needed on removal");
89 return Ok(false);
90 }
91
92 if doc.can_remove() {
93 oe.remove();
94 } else {
95 let vec = oe.get_mut();
96 vec.clear();
97 serde_json::to_writer(vec, &doc)?;
98 }
99
100 Ok(true)
101 }
102 Entry::Vacant(_) => Err(anyhow::anyhow!("not pinned")),
103 }
104 }
105}
106
107impl PinStore for MemDataStore {
108 async fn is_pinned(&self, block: &Cid) -> Result<bool, Error> {
109 let key = block.to_bytes();
110
111 let g = self.pin.lock().await;
112
113 Ok(g.contains_key(&key))
121 }
122
123 async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error> {
124 let mut g = Mutex::lock_owned(Arc::clone(&self.pin)).await;
125 Self::insert_pin(&mut g, target, &PinKind::Direct)?;
126 Ok(())
127 }
128
129 async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error> {
130 let mut g = Mutex::lock_owned(Arc::clone(&self.pin)).await;
131 Self::remove_pin(&mut g, target, &PinKind::Direct)?;
132 Ok(())
133 }
134
135 async fn insert_recursive_pin(
136 &self,
137 target: &Cid,
138 mut refs: crate::repo::References<'_>,
139 ) -> Result<(), Error> {
140 use futures::stream::TryStreamExt;
141
142 let mut g = Mutex::lock_owned(Arc::clone(&self.pin)).await;
143
144 Self::insert_pin(&mut g, target, &PinKind::RecursiveIntention)?;
146
147 let target_v1 = if target.version() == cid::Version::V1 {
148 target.to_owned()
149 } else {
150 Cid::new_v1(target.codec(), target.hash().to_owned())
152 };
153
154 let mut count = 0;
159 let kind = PinKind::IndirectFrom(&target_v1);
160 while let Some(next) = refs.try_next().await? {
161 Self::insert_pin(&mut g, &next, &kind)?;
163 count += 1;
164 }
165
166 let kind = PinKind::Recursive(count as u64);
167 Self::insert_pin(&mut g, target, &kind)?;
168
169 Ok(())
170 }
171
172 async fn remove_recursive_pin(
173 &self,
174 target: &Cid,
175 mut refs: crate::repo::References<'_>,
176 ) -> Result<(), Error> {
177 use futures::TryStreamExt;
178
179 let mut g = Mutex::lock_owned(Arc::clone(&self.pin)).await;
180
181 let doc: PinDocument = match g.get(&target.to_bytes()) {
182 Some(raw) => match serde_json::from_slice(raw) {
183 Ok(doc) => doc,
184 Err(e) => return Err(e.into()),
185 },
186 None => return Err(anyhow::anyhow!("not pinned or pinned indirectly")),
188 };
189
190 let kind = match doc.pick_kind() {
191 Some(Ok(kind @ PinKind::Recursive(_)))
192 | Some(Ok(kind @ PinKind::RecursiveIntention)) => kind,
193 Some(Ok(PinKind::Direct)) => {
194 Self::remove_pin(&mut g, target, &PinKind::Direct)?;
195 return Ok(());
196 }
197 Some(Ok(PinKind::IndirectFrom(cid))) => {
198 return Err(anyhow::anyhow!("pinned indirectly through {}", cid));
199 }
200 _ => return Err(anyhow::anyhow!("not pinned or pinned indirectly")),
202 };
203
204 Self::remove_pin(&mut g, target, &kind.as_ref())?;
206
207 let target_v1 = if target.version() == cid::Version::V1 {
208 target.to_owned()
209 } else {
210 Cid::new_v1(target.codec(), target.hash().to_owned())
212 };
213
214 let kind = PinKind::IndirectFrom(&target_v1);
215 while let Some(next) = refs.try_next().await? {
216 Self::remove_pin(&mut g, &next, &kind)?;
218 }
219
220 Ok(())
221 }
222
223 async fn list(
224 &self,
225 requirement: Option<PinMode>,
226 ) -> futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>> {
227 use futures::stream::StreamExt;
228 use std::convert::TryFrom;
229 let g = self.pin.lock().await;
230
231 let requirement = PinModeRequirement::from(requirement);
232
233 let copy = g
234 .iter()
235 .map(|(key, value)| {
236 let cid = Cid::try_from(key.as_slice())?;
237 let doc: PinDocument = serde_json::from_slice(value)?;
238 let mode = doc.mode().ok_or_else(|| anyhow::anyhow!("invalid mode"))?;
239
240 Ok((cid, mode))
241 })
242 .filter(move |res| {
243 match res {
245 Ok((_, mode)) => requirement.matches(mode),
246 Err(_) => true,
247 }
248 })
249 .collect::<Vec<_>>();
250
251 futures::stream::iter(copy).boxed()
252 }
253
254 async fn query(
255 &self,
256 cids: Vec<Cid>,
257 requirement: Option<PinMode>,
258 ) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
259 let g = self.pin.lock().await;
260
261 let requirement = PinModeRequirement::from(requirement);
262
263 cids.into_iter()
264 .map(move |cid| {
265 match g.get(&cid.to_bytes()) {
266 Some(raw) => {
267 let doc: PinDocument = match serde_json::from_slice(raw) {
268 Ok(doc) => doc,
269 Err(e) => return Err(e.into()),
270 };
271 let mode = match doc.pick_kind() {
274 Some(Ok(kind)) => kind,
275 Some(Err(invalid_cid)) => return Err(Error::new(invalid_cid)),
276 None => {
277 trace!(doc = ?doc, "could not pick pin kind");
278 return Err(anyhow::anyhow!("{} is not pinned", cid));
279 }
280 };
281
282 let matches = requirement.matches(&mode);
286
287 if matches {
288 trace!(cid = %cid, req = ?requirement, "pin matches");
289 return Ok((cid, mode));
290 } else {
291 return Err(anyhow::anyhow!(
293 "{} is not pinned as {:?}",
294 cid,
295 requirement
296 .required()
297 .expect("matches is never false if requirement is none")
298 ));
299 }
300 }
301 None => {
302 trace!(cid = %cid, "no record found");
303 }
304 }
305
306 Err(anyhow::anyhow!("{} is not pinned", cid))
308 })
309 .collect::<Result<Vec<_>, _>>()
310 }
311}
312
313impl DataStore for MemDataStore {
314 async fn init(&self) -> Result<(), Error> {
315 Ok(())
316 }
317
318 async fn contains(&self, key: &[u8]) -> Result<bool, Error> {
319 let contains = self.inner.lock().await.contains_key(key);
320 Ok(contains)
321 }
322
323 async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
324 let value = self
325 .inner
326 .lock()
327 .await
328 .get(key)
329 .map(|value| value.to_owned());
330 Ok(value)
331 }
332
333 async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
334 self.inner
335 .lock()
336 .await
337 .insert(key.to_owned(), value.to_owned());
338 Ok(())
339 }
340
341 async fn remove(&self, key: &[u8]) -> Result<(), Error> {
342 self.inner.lock().await.remove(key);
343 Ok(())
344 }
345
346 async fn iter(&self) -> futures::stream::BoxStream<'static, (Vec<u8>, Vec<u8>)> {
347 let list = self.inner.lock().await.clone();
348
349 let stream = async_stream::stream! {
350 for (k, v) in list {
351 yield (k, v)
352 }
353 };
354
355 stream.boxed()
356 }
357}
358
359#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
360enum Recursive {
361 Count(u64),
364 Intent,
369 Not,
371}
372
373impl Recursive {
374 fn is_set(&self) -> bool {
375 match self {
376 Recursive::Count(_) | Recursive::Intent => true,
377 Recursive::Not => false,
378 }
379 }
380}
381
382#[derive(Debug, Serialize, Deserialize)]
383struct PinDocument {
384 version: u8,
385 direct: bool,
386 recursive: Recursive,
388 cid_version: u8,
390 indirect_by: Vec<String>,
392}
393
394impl PinDocument {
395 fn update(&mut self, add: bool, kind: &PinKind<&'_ Cid>) -> Result<bool, PinUpdateError> {
396 match kind {
401 PinKind::IndirectFrom(root) => {
402 let root = if root.version() == cid::Version::V1 {
403 root.to_string()
404 } else {
405 Cid::new_v1(root.codec(), (*root).hash().to_owned()).to_string()
407 };
408
409 let modified = if self.indirect_by.is_empty() {
410 if add {
411 self.indirect_by.push(root);
412 true
413 } else {
414 false
415 }
416 } else {
417 let mut set = self
418 .indirect_by
419 .drain(..)
420 .collect::<std::collections::BTreeSet<_>>();
421
422 let modified = if add {
423 set.insert(root)
424 } else {
425 set.remove(&root)
426 };
427
428 self.indirect_by.extend(set);
429 modified
430 };
431
432 Ok(modified)
433 }
434 PinKind::Direct => {
435 if self.recursive.is_set() && !self.direct && add {
436 return Err(PinUpdateError::AlreadyPinnedRecursive);
439 }
440
441 if !add && !self.direct {
442 return match !self.recursive.is_set() {
443 true => Err(PinUpdateError::CannotUnpinUnpinned),
444 false => Err(PinUpdateError::CannotUnpinDirectOnRecursivelyPinned),
445 };
446 }
447
448 let modified = self.direct != add;
449 self.direct = add;
450 Ok(modified)
451 }
452 PinKind::RecursiveIntention => {
453 let modified = if add {
454 match self.recursive {
455 Recursive::Count(_) => return Err(PinUpdateError::AlreadyPinnedRecursive),
456 Recursive::Intent => false,
459 Recursive::Not => {
460 self.recursive = Recursive::Intent;
461 self.direct = false;
462 true
463 }
464 }
465 } else {
466 match self.recursive {
467 Recursive::Count(_) | Recursive::Intent => {
468 self.recursive = Recursive::Not;
469 true
470 }
471 Recursive::Not => false,
472 }
473 };
474
475 Ok(modified)
476 }
477 PinKind::Recursive(descendants) => {
478 let descendants = *descendants;
479 let modified = if add {
480 match self.recursive {
481 Recursive::Count(other) if other != descendants => {
482 return Err(PinUpdateError::UnexpectedNumberOfDescendants(
483 other,
484 descendants,
485 ));
486 }
487 Recursive::Count(_) => false,
488 Recursive::Intent | Recursive::Not => {
489 self.recursive = Recursive::Count(descendants);
490 self.direct = false;
493 true
494 }
495 }
496 } else {
497 match self.recursive {
498 Recursive::Count(other) if other != descendants => {
499 return Err(PinUpdateError::UnexpectedNumberOfDescendants(
500 other,
501 descendants,
502 ));
503 }
504 Recursive::Count(_) | Recursive::Intent => {
505 self.recursive = Recursive::Not;
506 true
507 }
508 Recursive::Not => return Err(PinUpdateError::NotPinnedRecursive),
509 }
510 };
514 Ok(modified)
515 }
516 }
517 }
518
519 fn can_remove(&self) -> bool {
520 !self.direct && !self.recursive.is_set() && self.indirect_by.is_empty()
521 }
522
523 fn mode(&self) -> Option<PinMode> {
524 if self.recursive.is_set() {
525 Some(PinMode::Recursive)
526 } else if !self.indirect_by.is_empty() {
527 Some(PinMode::Indirect)
528 } else if self.direct {
529 Some(PinMode::Direct)
530 } else {
531 None
532 }
533 }
534
535 fn pick_kind(&self) -> Option<Result<PinKind<Cid>, cid::Error>> {
536 self.mode().map(|p| {
537 Ok(match p {
538 PinMode::Recursive => match self.recursive {
539 Recursive::Intent => PinKind::RecursiveIntention,
540 Recursive::Count(total) => PinKind::Recursive(total),
541 _ => unreachable!("mode should not have returned PinKind::Recursive"),
542 },
543 PinMode::Indirect => {
544 let cid = Cid::try_from(self.indirect_by[0].as_str())?;
548 PinKind::IndirectFrom(cid)
549 }
550 PinMode::Direct => PinKind::Direct,
551 })
552 })
553 }
554}
555
556#[derive(Debug, thiserror::Error)]
558pub enum PinUpdateError {
559 #[error("unexpected number of descendants ({}), found {}", .1, .0)]
561 UnexpectedNumberOfDescendants(u64, u64),
562 #[error("not pinned recursively")]
564 NotPinnedRecursive,
565 #[error("already pinned recursively")]
567 AlreadyPinnedRecursive,
568 #[error("not pinned or pinned indirectly")]
570 CannotUnpinUnpinned,
571 #[error("is pinned recursively")]
574 CannotUnpinDirectOnRecursivelyPinned,
575}
576
577#[cfg(test)]
578crate::pinstore_interface_tests!(
579 common_tests,
580 crate::repo::datastore::memory::MemDataStore::new
581);
582
583#[cfg(test)]
584mod tests {
585 use super::*;
586
587 #[tokio::test]
588 async fn test_mem_datastore() {
589 let tmp = std::env::temp_dir();
590 let store = MemDataStore::new(tmp);
591 let key = [1, 2, 3, 4];
592 let value = [5, 6, 7, 8];
593
594 store.init().await.unwrap();
595
596 let contains = store.contains(&key);
597 assert!(!contains.await.unwrap());
598 let get = store.get(&key);
599 assert_eq!(get.await.unwrap(), None);
600 store.remove(&key).await.unwrap();
601
602 let put = store.put(&key, &value);
603 put.await.unwrap();
604 let contains = store.contains(&key);
605 assert!(contains.await.unwrap());
606 let get = store.get(&key);
607 assert_eq!(get.await.unwrap(), Some(value.to_vec()));
608
609 store.remove(&key).await.unwrap();
610 let contains = store.contains(&key);
611 assert!(!contains.await.unwrap());
612 let get = store.get(&key);
613 assert_eq!(get.await.unwrap(), None);
614 }
615
616 #[test]
617 fn pindocument_on_direct_pin() {
618 let mut doc = PinDocument {
619 version: 0,
620 direct: false,
621 recursive: Recursive::Not,
622 cid_version: 0,
623 indirect_by: Vec::new(),
624 };
625
626 assert!(doc.update(true, &PinKind::Direct).unwrap());
627
628 assert_eq!(doc.mode(), Some(PinMode::Direct));
629 assert_eq!(doc.pick_kind().unwrap().unwrap(), PinKind::Direct);
630 }
631}