noosphere_storage/
tap.rs

1use crate::BlockStore;
2use anyhow::Result;
3use async_trait::async_trait;
4use cid::Cid;
5use tokio::sync::mpsc::{channel, Receiver, Sender};
6
7/// Wraps any [BlockStore] and "taps" it by cloning any block successfully
8/// retrieved from the store and sending over an MPSC channel. This allows an
9/// observer to record all the blocks needed to load arbitrarily deep and
10/// complex DAGs into memory without orchestrating a dedicated callback for the
11/// DAG implementations to invoke.
12///
13/// Note that the [Receiver] end of the channel will consider the channel open
14/// (and thus will continue to await values) until all of its associated
15/// [BlockStoreTap] clones are dropped. If you expect a finite number of blocks
16/// to be sent to the [Receiver], ensure that all [BlockStoreTap] clones are
17/// eventually dropped. Otherwise, the [Receiver] will continue waiting to
18/// receive blocks.
19#[derive(Clone)]
20pub struct BlockStoreTap<S>
21where
22    S: BlockStore,
23{
24    store: S,
25    tx: Sender<(Cid, Vec<u8>)>,
26}
27
28impl<S> BlockStoreTap<S>
29where
30    S: BlockStore,
31{
32    pub fn new(store: S, capacity: usize) -> (Self, Receiver<(Cid, Vec<u8>)>) {
33        let (tx, rx) = channel(capacity);
34        (BlockStoreTap { store, tx }, rx)
35    }
36}
37
38#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
39#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
40impl<S> BlockStore for BlockStoreTap<S>
41where
42    S: BlockStore,
43{
44    async fn put_block(&mut self, cid: &Cid, block: &[u8]) -> Result<()> {
45        self.store.put_block(cid, block).await
46    }
47
48    async fn get_block(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
49        Ok(match self.store.get_block(cid).await? {
50            Some(block) => {
51                self.tx.send((*cid, block.clone())).await?;
52                Some(block)
53            }
54            None => None,
55        })
56    }
57}
58
59#[cfg(test)]
60mod tests {
61    use cid::Cid;
62    use libipld_cbor::DagCborCodec;
63    use tokio_stream::{wrappers::ReceiverStream, StreamExt};
64    #[cfg(target_arch = "wasm32")]
65    use wasm_bindgen_test::wasm_bindgen_test;
66
67    use crate::{block_deserialize, BlockStore, BlockStoreTap, MemoryStore};
68
69    #[cfg(target_arch = "wasm32")]
70    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
71
72    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
73    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
74    async fn it_sends_all_retrieved_blocks_to_the_channel() {
75        let store = MemoryStore::default();
76
77        let (mut tap, mut rx) = BlockStoreTap::new(store, 32);
78
79        let mut cids = Vec::new();
80
81        for i in 0..10 {
82            cids.push(tap.save::<DagCborCodec, _>(vec![i as u8]).await.unwrap());
83        }
84
85        assert_eq!(
86            rx.try_recv(),
87            Err(tokio::sync::mpsc::error::TryRecvError::Empty)
88        );
89
90        for cid in cids.iter() {
91            tap.load::<DagCborCodec, Vec<u8>>(cid).await.unwrap();
92        }
93
94        drop(tap);
95
96        let stream = ReceiverStream::new(rx);
97        let results = stream.collect::<Vec<(Cid, Vec<u8>)>>().await;
98
99        for (i, (cid, bytes)) in results.iter().enumerate() {
100            assert_eq!(cid, &cids[i]);
101
102            let value = block_deserialize::<DagCborCodec, Vec<u8>>(bytes).unwrap();
103
104            assert_eq!(value.as_slice(), &[i as u8]);
105        }
106    }
107}