use crate::{BlockStat, BlockStorage, Storage, StorageError};
use async_trait::async_trait;
use cid::Cid;
use co_primitives::{Block, DefaultParams};
use futures::Future;
use std::{
marker::PhantomData,
sync::{
mpsc::{SendError, Sender},
Arc,
},
thread::{self, JoinHandle},
};
use tokio::runtime::Handle;
#[derive(Debug)]
pub struct SyncStorage<S>
where
S: Storage,
{
sender: Sender<Message>,
_handle: Arc<JoinHandle<()>>,
_type: PhantomData<S>,
}
impl<S> Clone for SyncStorage<S>
where
S: Storage,
{
fn clone(&self) -> Self {
Self { sender: self.sender.clone(), _handle: self._handle.clone(), _type: self._type }
}
}
impl<S> SyncStorage<S>
where
S: Storage + Send + 'static,
{
pub fn new(mut next: S) -> Self {
let (sender, receiver) = std::sync::mpsc::channel::<Message>();
let handle = thread::spawn(move || {
fn handle_send_result<T>(t: Result<(), SendError<T>>) {
if let Err(err) = t {
tracing::warn!(?err, "send-failed");
}
}
loop {
match receiver.recv() {
Err(_) => break, Ok(Message::Get(cid, result)) => handle_send_result(result.send(next.get(&cid))),
Ok(Message::Set(block, result)) => handle_send_result(result.send(next.set(block))),
Ok(Message::Remove(cid, result)) => handle_send_result(result.send(next.remove(&cid))),
}
}
});
Self { sender, _handle: Arc::new(handle), _type: Default::default() }
}
}
impl<S> Storage for SyncStorage<S>
where
S: Storage + Send + 'static,
{
type StoreParams = S::StoreParams;
fn get(&self, cid: &Cid) -> Result<Block, StorageError> {
let (sender, receiver) = std::sync::mpsc::channel::<Result<Block, StorageError>>();
self.sender
.send(Message::Get(*cid, sender))
.map_err(|e| StorageError::Internal(e.into()))?;
let result = receiver.recv();
match result {
Ok(e) => e,
Err(e) => Err(StorageError::Internal(e.into())),
}
}
fn set(&mut self, block: Block) -> Result<Cid, StorageError> {
let (sender, receiver) = std::sync::mpsc::channel::<Result<Cid, StorageError>>();
self.sender
.send(Message::Set(block, sender))
.map_err(|e| StorageError::Internal(e.into()))?;
let result = receiver.recv();
match result {
Ok(e) => e,
Err(e) => Err(StorageError::Internal(e.into())),
}
}
fn remove(&mut self, cid: &Cid) -> Result<(), StorageError> {
let (sender, receiver) = std::sync::mpsc::channel::<Result<(), StorageError>>();
self.sender
.send(Message::Remove(*cid, sender))
.map_err(|e| StorageError::Internal(e.into()))?;
let result = receiver.recv();
match result {
Ok(e) => e,
Err(e) => Err(StorageError::Internal(e.into())),
}
}
}
#[derive(Debug)]
enum Message {
Get(Cid, Sender<Result<Block, StorageError>>),
Set(Block, Sender<Result<Cid, StorageError>>),
Remove(Cid, Sender<Result<(), StorageError>>),
}
#[derive(Debug, Clone)]
pub struct SyncBlockStorage<S> {
storage: S,
handle: Handle,
}
impl<S> SyncBlockStorage<S>
where
S: BlockStorage + Send + Sync + Clone + 'static,
{
pub fn new(storage: S, handle: Handle) -> Self {
Self { storage, handle }
}
fn execute<F, R>(&self, f: F) -> Result<R, StorageError>
where
F: Future<Output = Result<R, StorageError>> + Send + 'static,
F::Output: Send + 'static,
R: Send + 'static,
{
match self.handle.block_on(self.handle.spawn(f)) {
Ok(r) => r,
Err(e) => Err(StorageError::Internal(e.into())),
}
}
}
impl<S> Storage for SyncBlockStorage<S>
where
S: BlockStorage + Send + Sync + Clone + 'static,
{
type StoreParams = DefaultParams;
fn get(&self, cid: &Cid) -> Result<Block, StorageError> {
let storage = self.storage.clone();
let cid = *cid;
self.execute(async move { storage.get(&cid).await })
}
fn set(&mut self, block: Block) -> Result<Cid, StorageError> {
let storage = self.storage.clone();
self.execute(async move { storage.set(block).await })
}
fn remove(&mut self, cid: &Cid) -> Result<(), StorageError> {
let storage = self.storage.clone();
let cid = *cid;
self.execute(async move { storage.remove(&cid).await })
}
}
#[async_trait]
impl<S> BlockStorage for SyncBlockStorage<S>
where
S: BlockStorage + Clone + 'static,
{
async fn get(&self, cid: &Cid) -> Result<Block, StorageError> {
self.storage.get(cid).await
}
async fn set(&self, block: Block) -> Result<Cid, StorageError> {
self.storage.set(block).await
}
async fn remove(&self, cid: &Cid) -> Result<(), StorageError> {
self.storage.remove(cid).await
}
async fn stat(&self, cid: &Cid) -> Result<BlockStat, StorageError> {
self.storage.stat(cid).await
}
fn max_block_size(&self) -> usize {
self.storage.max_block_size()
}
}