co_storage/storage/
request.rs1use crate::{BlockStat, BlockStorage, StorageError};
5use async_trait::async_trait;
6use cid::Cid;
7use co_primitives::{Block, BlockStorageCloneSettings, CloneWithBlockStorageSettings};
8use futures::{
9 channel::{mpsc, oneshot},
10 SinkExt,
11};
12
13#[derive(Debug)]
14pub enum Request {
15 Get(Cid, oneshot::Sender<Result<Block, StorageError>>),
16 Set(Block, oneshot::Sender<Result<Cid, StorageError>>),
17 Remove(Cid, oneshot::Sender<Result<(), StorageError>>),
18 Stat(Cid, oneshot::Sender<Result<BlockStat, StorageError>>),
19}
20
21#[derive(Debug, Clone)]
22pub struct RequestBlockStorage {
23 sender: mpsc::Sender<Request>,
24 max_block_size: usize,
25}
26impl RequestBlockStorage {
27 pub fn new(buffer: usize, max_block_size: usize) -> (Self, mpsc::Receiver<Request>) {
28 let (tx, rx) = mpsc::channel(buffer);
29 (Self { sender: tx, max_block_size }, rx)
30 }
31}
32#[async_trait]
33impl BlockStorage for RequestBlockStorage {
34 async fn get(&self, cid: &Cid) -> Result<Block, StorageError> {
36 let (tx, rx) = oneshot::channel();
37 self.sender
38 .clone()
39 .send(Request::Get(*cid, tx))
40 .await
41 .map_err(|e| StorageError::Internal(e.into()))?;
42 rx.await.map_err(|e| StorageError::Internal(e.into()))?
43 }
44
45 async fn set(&self, block: Block) -> Result<Cid, StorageError> {
48 let (tx, rx) = oneshot::channel();
49 self.sender
50 .clone()
51 .send(Request::Set(block, tx))
52 .await
53 .map_err(|e| StorageError::Internal(e.into()))?;
54 rx.await.map_err(|e| StorageError::Internal(e.into()))?
55 }
56
57 async fn remove(&self, cid: &Cid) -> Result<(), StorageError> {
59 let (tx, rx) = oneshot::channel();
60 self.sender
61 .clone()
62 .send(Request::Remove(*cid, tx))
63 .await
64 .map_err(|e| StorageError::Internal(e.into()))?;
65 rx.await.map_err(|e| StorageError::Internal(e.into()))?
66 }
67
68 async fn stat(&self, cid: &Cid) -> Result<BlockStat, StorageError> {
70 let (tx, rx) = oneshot::channel();
71 self.sender
72 .clone()
73 .send(Request::Stat(*cid, tx))
74 .await
75 .map_err(|e| StorageError::Internal(e.into()))?;
76 rx.await.map_err(|e| StorageError::Internal(e.into()))?
77 }
78
79 fn max_block_size(&self) -> usize {
80 self.max_block_size
81 }
82}
83impl CloneWithBlockStorageSettings for RequestBlockStorage {
84 fn clone_with_settings(&self, _settings: BlockStorageCloneSettings) -> Self {
85 self.clone()
86 }
87}