Skip to main content

co_storage/storage/
request.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use crate::{BlockStat, BlockStorage, StorageError};
5use async_trait::async_trait;
6use cid::Cid;
7use co_primitives::{Block, BlockStorageCloneSettings, CloneWithBlockStorageSettings};
8use futures::{
9	channel::{mpsc, oneshot},
10	SinkExt,
11};
12
13#[derive(Debug)]
14pub enum Request {
15	Get(Cid, oneshot::Sender<Result<Block, StorageError>>),
16	Set(Block, oneshot::Sender<Result<Cid, StorageError>>),
17	Remove(Cid, oneshot::Sender<Result<(), StorageError>>),
18	Stat(Cid, oneshot::Sender<Result<BlockStat, StorageError>>),
19}
20
21#[derive(Debug, Clone)]
22pub struct RequestBlockStorage {
23	sender: mpsc::Sender<Request>,
24	max_block_size: usize,
25}
26impl RequestBlockStorage {
27	pub fn new(buffer: usize, max_block_size: usize) -> (Self, mpsc::Receiver<Request>) {
28		let (tx, rx) = mpsc::channel(buffer);
29		(Self { sender: tx, max_block_size }, rx)
30	}
31}
32#[async_trait]
33impl BlockStorage for RequestBlockStorage {
34	/// Returns a block from storage.
35	async fn get(&self, cid: &Cid) -> Result<Block, StorageError> {
36		let (tx, rx) = oneshot::channel();
37		self.sender
38			.clone()
39			.send(Request::Get(*cid, tx))
40			.await
41			.map_err(|e| StorageError::Internal(e.into()))?;
42		rx.await.map_err(|e| StorageError::Internal(e.into()))?
43	}
44
45	/// Inserts a block into storage.
46	/// Returns the CID of the block (gurranted to be the same as the supplied).
47	async fn set(&self, block: Block) -> Result<Cid, StorageError> {
48		let (tx, rx) = oneshot::channel();
49		self.sender
50			.clone()
51			.send(Request::Set(block, tx))
52			.await
53			.map_err(|e| StorageError::Internal(e.into()))?;
54		rx.await.map_err(|e| StorageError::Internal(e.into()))?
55	}
56
57	/// Remove a block.
58	async fn remove(&self, cid: &Cid) -> Result<(), StorageError> {
59		let (tx, rx) = oneshot::channel();
60		self.sender
61			.clone()
62			.send(Request::Remove(*cid, tx))
63			.await
64			.map_err(|e| StorageError::Internal(e.into()))?;
65		rx.await.map_err(|e| StorageError::Internal(e.into()))?
66	}
67
68	/// Stat a block.
69	async fn stat(&self, cid: &Cid) -> Result<BlockStat, StorageError> {
70		let (tx, rx) = oneshot::channel();
71		self.sender
72			.clone()
73			.send(Request::Stat(*cid, tx))
74			.await
75			.map_err(|e| StorageError::Internal(e.into()))?;
76		rx.await.map_err(|e| StorageError::Internal(e.into()))?
77	}
78
79	fn max_block_size(&self) -> usize {
80		self.max_block_size
81	}
82}
83impl CloneWithBlockStorageSettings for RequestBlockStorage {
84	fn clone_with_settings(&self, _settings: BlockStorageCloneSettings) -> Self {
85		self.clone()
86	}
87}