1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
use anyhow::{anyhow, Result};
use async_std::sync::Mutex;
use async_trait::async_trait;
use cid::Cid;
use std::{collections::HashMap, sync::Arc};

use crate::storage::Storage;
use crate::store::Store;

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait StoreContainsCid {
    async fn contains_cid(&self, cid: &Cid) -> Result<bool>;
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<S: Store> StoreContainsCid for S {
    async fn contains_cid(&self, cid: &Cid) -> Result<bool> {
        Ok(self.read(&cid.to_bytes()).await?.is_some())
    }
}

#[derive(Default, Clone, Debug)]
pub struct MemoryStorage {
    stores: Arc<Mutex<HashMap<String, MemoryStore>>>,
}

impl MemoryStorage {
    async fn get_store(&self, name: &str) -> Result<MemoryStore> {
        let mut stores = self.stores.lock().await;

        if !stores.contains_key(name) {
            stores.insert(name.to_string(), Default::default());
        }

        stores
            .get(name)
            .cloned()
            .ok_or_else(|| anyhow!("Failed to initialize {} store", name))
    }
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl Storage for MemoryStorage {
    type BlockStore = MemoryStore;

    type KeyValueStore = MemoryStore;

    async fn get_block_store(&self, name: &str) -> Result<Self::BlockStore> {
        self.get_store(name).await
    }

    async fn get_key_value_store(&self, name: &str) -> Result<Self::KeyValueStore> {
        self.get_store(name).await
    }
}

#[derive(Clone, Default, Debug)]
pub struct MemoryStore {
    pub entries: Arc<Mutex<HashMap<Vec<u8>, Vec<u8>>>>,
}

impl MemoryStore {
    pub async fn get_stored_cids(&self) -> Vec<Cid> {
        self.entries
            .lock()
            .await
            .keys()
            .filter_map(|bytes| match Cid::try_from(bytes.as_slice()) {
                Ok(cid) => Some(cid),
                _ => None,
            })
            .collect()
    }

    pub async fn expect_replica_in<S: Store>(&self, other: &S) -> Result<()> {
        let cids = self.get_stored_cids().await;
        let mut missing = Vec::new();

        for cid in cids {
            trace!("Checking for {}", cid);

            if !other.contains_cid(&cid).await? {
                trace!("Not found!");
                missing.push(cid);
            }
        }

        if !missing.is_empty() {
            return Err(anyhow!(
                "Expected replica, but the following CIDs are missing: {:#?}",
                missing
                    .into_iter()
                    .map(|cid| format!("{cid}"))
                    .collect::<Vec<String>>()
            ));
        }

        Ok(())
    }

    pub async fn fork(&self) -> Self {
        MemoryStore {
            entries: Arc::new(Mutex::new(self.entries.lock().await.clone())),
        }
    }
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl Store for MemoryStore {
    async fn read(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
        let dags = self.entries.lock().await;
        Ok(dags.get(key).cloned())
    }

    async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result<Option<Vec<u8>>> {
        let mut dags = self.entries.lock().await;
        let old_value = dags.get(key).cloned();

        dags.insert(key.to_vec(), bytes.to_vec());

        Ok(old_value)
    }

    async fn remove(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
        let mut dags = self.entries.lock().await;
        Ok(dags.remove(key))
    }
}