1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use crate::BlockStore;
use anyhow::Result;
use async_trait::async_trait;
use cid::Cid;
use tokio::sync::mpsc::{channel, Receiver, Sender};

/// Wraps any [BlockStore] and "taps" it by cloning any block successfully
/// retrieved from the store and sending over an MPSC channel. This allows an
/// observer to record all the blocks needed to load arbitrarily deep and
/// complex DAGs into memory without orchestrating a dedicated callback for the
/// DAG implementations to invoke.
///
/// Note that the [Receiver] end of the channel will consider the channel open
/// (and thus will continue to await values) until all of its associated
/// [BlockStoreTap] clones are dropped. If you expect a finite number of blocks
/// to be sent to the [Receiver], ensure that all [BlockStoreTap] clones are
/// eventually dropped. Otherwise, the [Receiver] will continue waiting to
/// receive blocks.
#[derive(Clone)]
pub struct BlockStoreTap<S>
where
    S: BlockStore,
{
    store: S,
    tx: Sender<(Cid, Vec<u8>)>,
}

impl<S> BlockStoreTap<S>
where
    S: BlockStore,
{
    pub fn new(store: S, capacity: usize) -> (Self, Receiver<(Cid, Vec<u8>)>) {
        let (tx, rx) = channel(capacity);
        (BlockStoreTap { store, tx }, rx)
    }
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<S> BlockStore for BlockStoreTap<S>
where
    S: BlockStore,
{
    async fn put_block(&mut self, cid: &Cid, block: &[u8]) -> Result<()> {
        self.store.put_block(cid, block).await
    }

    async fn get_block(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
        Ok(match self.store.get_block(cid).await? {
            Some(block) => {
                self.tx.send((cid.clone(), block.clone())).await?;
                Some(block)
            }
            None => None,
        })
    }
}

#[cfg(test)]
mod tests {
    use cid::Cid;
    use libipld_cbor::DagCborCodec;
    use tokio_stream::{wrappers::ReceiverStream, StreamExt};
    #[cfg(target_arch = "wasm32")]
    use wasm_bindgen_test::wasm_bindgen_test;

    use crate::{block_deserialize, BlockStore, BlockStoreTap, MemoryStore};

    #[cfg(target_arch = "wasm32")]
    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
    #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
    async fn it_sends_all_retrieved_blocks_to_the_channel() {
        let store = MemoryStore::default();

        let (mut tap, mut rx) = BlockStoreTap::new(store, 32);

        let mut cids = Vec::new();

        for i in 0..10 {
            cids.push(tap.save::<DagCborCodec, _>(vec![i as u8]).await.unwrap());
        }

        assert_eq!(
            rx.try_recv(),
            Err(tokio::sync::mpsc::error::TryRecvError::Empty)
        );

        for cid in cids.iter() {
            tap.load::<DagCborCodec, Vec<u8>>(cid).await.unwrap();
        }

        drop(tap);

        let stream = ReceiverStream::new(rx);
        let results = stream.collect::<Vec<(Cid, Vec<u8>)>>().await;

        for (i, (cid, bytes)) in results.iter().enumerate() {
            assert_eq!(cid, &cids[i]);

            let value = block_deserialize::<DagCborCodec, Vec<u8>>(&bytes).unwrap();

            assert_eq!(value.as_slice(), &[i as u8]);
        }
    }
}