use crate::config::SCConfig;
use crate::{error::*, BlockServer, StateServer};
use ethers::core::types::H256;
use eth_state_fold_types::ethers;
use eth_state_fold_types::{
Block, BlockState, BlockStreamItem, BlocksSince, QueryBlock, StateStreamItem, StatesSince,
};
use eth_state_server_common::state_fold_server;
use state_fold_server::state_fold_client::StateFoldClient;
use state_fold_server::{
InitialState, QueryBlockRequest, QueryBlocksSinceRequest, QueryStateRequest,
QueryStatesSinceRequest, SubscribeNewBlocksRequest, SubscribeNewStatesRequest,
};
use tonic::{transport::Channel, Request};
use snafu::ResultExt;
use std::sync::Arc;
use async_trait::async_trait;
use std::pin::Pin;
use tokio_stream::{Stream, StreamExt};
pub struct GrpcStateFoldClient<I, S> {
client: StateFoldClient<Channel>,
__marker1: std::marker::PhantomData<I>,
__marker2: std::marker::PhantomData<S>,
}
impl<I, S> GrpcStateFoldClient<I, S> {
pub fn new_from_channel(channel: Channel, config: &SCConfig) -> Self {
let client = StateFoldClient::new(channel)
.max_decoding_message_size(config.max_decoding_message_size);
Self {
client,
__marker1: std::marker::PhantomData,
__marker2: std::marker::PhantomData,
}
}
}
#[async_trait]
impl<I, S> BlockServer for GrpcStateFoldClient<I, S>
where
I: Send + Sync,
S: Send + Sync,
{
async fn query_block(
&self,
query_block: impl Into<QueryBlock> + Send + 'static,
) -> Result<Arc<Block>> {
let mut client = self.client.clone();
let query_block: QueryBlock = query_block.into();
let request = Request::new(QueryBlockRequest {
query_block: Some(query_block.into()),
});
let block = client
.query_block(request)
.await
.context(TonicSnafu {
context: "`get_block` request",
})?
.into_inner()
.try_into()
.context(MessageConversionSnafu {
context: "`get_block`".to_owned(),
})?;
Ok(block)
}
async fn query_blocks_since(
&self,
previous_block_hash: H256,
depth: usize,
) -> Result<BlocksSince> {
let mut client = self.client.clone();
let request = Request::new(QueryBlocksSinceRequest {
previous_block: Some(previous_block_hash.into()),
depth: depth as u64,
});
let diff = client
.query_blocks_since(request)
.await
.context(TonicSnafu {
context: "`get_block_diff` request",
})?
.into_inner()
.try_into()
.context(MessageConversionSnafu {
context: "`get_block_diff`".to_owned(),
})?;
Ok(diff)
}
async fn subscribe_blocks(
&self,
confirmations: usize,
) -> Result<Pin<Box<dyn Stream<Item = Result<BlockStreamItem>> + Send>>> {
let mut client = self.client.clone();
let request = Request::new(SubscribeNewBlocksRequest {
confirmations: confirmations as u64,
});
let stream = client
.subscribe_new_blocks(request)
.await
.context(TonicSnafu {
context: "`subscribe_blocks` request",
})?
.into_inner();
let stream = stream.map(|b| -> Result<BlockStreamItem> {
b.context(TonicSnafu {
context: "`subscribe_blocks` stream item conversion",
})?
.try_into()
.context(MessageConversionSnafu {
context: "`subscribe_blocks` stream item conversion",
})
});
Ok(Box::pin(stream))
}
}
#[async_trait]
impl<I, S> StateServer for GrpcStateFoldClient<I, S>
where
I: serde::Serialize + Send + Sync,
S: serde::de::DeserializeOwned + Send + Sync,
{
type InitialState = I;
type State = S;
async fn query_state(
&self,
initial_state: &Self::InitialState,
query_block: impl Into<QueryBlock> + Send + 'static,
) -> Result<BlockState<Self::State>> {
let mut client = self.client.clone();
let initial_state_json = InitialState {
json_data: serde_json::to_string(&initial_state).context(SerializeSnafu)?,
};
let query_block: QueryBlock = query_block.into();
let request = Request::new(QueryStateRequest {
initial_state: Some(initial_state_json),
query_block: Some(query_block.into()),
});
let state = client
.query_state(request)
.await
.context(TonicSnafu {
context: "`get_state` request",
})?
.into_inner()
.try_into()
.context(StateConversionSnafu {
context: "`get_state`".to_owned(),
})?;
Ok(state)
}
async fn query_states_since(
&self,
initial_state: &Self::InitialState,
previous_block_hash: H256,
depth: usize,
) -> Result<StatesSince<Self::State>> {
let mut client = self.client.clone();
let initial_state_json = InitialState {
json_data: serde_json::to_string(&initial_state).context(SerializeSnafu)?,
};
let request = Request::new(QueryStatesSinceRequest {
initial_state: Some(initial_state_json),
previous_block: Some(previous_block_hash.into()),
depth: depth as u64,
});
let diff = client
.query_states_since(request)
.await
.context(TonicSnafu {
context: "`get_state_diff` request",
})?
.into_inner()
.try_into()
.context(StateConversionSnafu {
context: "`get_state_diff`".to_owned(),
})?;
Ok(diff)
}
async fn subscribe_states(
&self,
initial_state: &Self::InitialState,
confirmations: usize,
) -> Result<Pin<Box<dyn Stream<Item = Result<StateStreamItem<Self::State>>> + Send>>> {
let mut client = self.client.clone();
let initial_state_json = InitialState {
json_data: serde_json::to_string(&initial_state).context(SerializeSnafu)?,
};
let request = Request::new(SubscribeNewStatesRequest {
initial_state: Some(initial_state_json),
confirmations: confirmations as u64,
});
let stream = client
.subscribe_new_states(request)
.await
.context(TonicSnafu {
context: "`subscribe_blocks` request",
})?
.into_inner();
let stream = stream.map(|s| -> Result<StateStreamItem<Self::State>> {
s.context(TonicSnafu {
context: "`subscribe_blocks` stream item conversion",
})?
.try_into()
.context(StateConversionSnafu {
context: "`subscribe_blocks` stream item conversion",
})
});
Ok(Box::pin(stream))
}
}