1use crate::BlockStore;
2use anyhow::Result;
3use async_trait::async_trait;
4use cid::Cid;
5use tokio::sync::mpsc::{channel, Receiver, Sender};
6
7#[derive(Clone)]
20pub struct BlockStoreTap<S>
21where
22 S: BlockStore,
23{
24 store: S,
25 tx: Sender<(Cid, Vec<u8>)>,
26}
27
28impl<S> BlockStoreTap<S>
29where
30 S: BlockStore,
31{
32 pub fn new(store: S, capacity: usize) -> (Self, Receiver<(Cid, Vec<u8>)>) {
33 let (tx, rx) = channel(capacity);
34 (BlockStoreTap { store, tx }, rx)
35 }
36}
37
38#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
39#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
40impl<S> BlockStore for BlockStoreTap<S>
41where
42 S: BlockStore,
43{
44 async fn put_block(&mut self, cid: &Cid, block: &[u8]) -> Result<()> {
45 self.store.put_block(cid, block).await
46 }
47
48 async fn get_block(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
49 Ok(match self.store.get_block(cid).await? {
50 Some(block) => {
51 self.tx.send((*cid, block.clone())).await?;
52 Some(block)
53 }
54 None => None,
55 })
56 }
57}
58
59#[cfg(test)]
60mod tests {
61 use cid::Cid;
62 use libipld_cbor::DagCborCodec;
63 use tokio_stream::{wrappers::ReceiverStream, StreamExt};
64 #[cfg(target_arch = "wasm32")]
65 use wasm_bindgen_test::wasm_bindgen_test;
66
67 use crate::{block_deserialize, BlockStore, BlockStoreTap, MemoryStore};
68
69 #[cfg(target_arch = "wasm32")]
70 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
71
72 #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
73 #[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
74 async fn it_sends_all_retrieved_blocks_to_the_channel() {
75 let store = MemoryStore::default();
76
77 let (mut tap, mut rx) = BlockStoreTap::new(store, 32);
78
79 let mut cids = Vec::new();
80
81 for i in 0..10 {
82 cids.push(tap.save::<DagCborCodec, _>(vec![i as u8]).await.unwrap());
83 }
84
85 assert_eq!(
86 rx.try_recv(),
87 Err(tokio::sync::mpsc::error::TryRecvError::Empty)
88 );
89
90 for cid in cids.iter() {
91 tap.load::<DagCborCodec, Vec<u8>>(cid).await.unwrap();
92 }
93
94 drop(tap);
95
96 let stream = ReceiverStream::new(rx);
97 let results = stream.collect::<Vec<(Cid, Vec<u8>)>>().await;
98
99 for (i, (cid, bytes)) in results.iter().enumerate() {
100 assert_eq!(cid, &cids[i]);
101
102 let value = block_deserialize::<DagCborCodec, Vec<u8>>(bytes).unwrap();
103
104 assert_eq!(value.as_slice(), &[i as u8]);
105 }
106 }
107}