#[cfg(feature = "client")]
pub use crate::apis::{
BlockClient, CalculateReplyForHandleResult, CodeClient, DevClient, FullProgramState,
InjectedClient, ProgramClient,
};
#[cfg(feature = "server")]
use anyhow::Result;
#[cfg(feature = "server")]
use apis::{
BlockApi, BlockServer, CodeApi, CodeServer, DevApi, DevServer, InjectedApi, InjectedServer,
ProgramApi, ProgramServer,
};
#[cfg(feature = "server")]
use ethexe_common::injected::{
AddressedInjectedTransaction, InjectedTransactionAcceptance, Promise, SignedCompactTxReceipt,
};
#[cfg(feature = "server")]
use ethexe_db::Database;
#[cfg(feature = "server")]
use ethexe_processor::{Processor, ProcessorConfig};
#[cfg(feature = "server")]
use futures::{Stream, stream::FusedStream};
#[cfg(feature = "server")]
use hyper::header::HeaderValue;
#[cfg(feature = "server")]
use jsonrpsee::{
RpcModule as JsonrpcModule,
server::{PingConfig, RpcServiceBuilder, Server, ServerHandle},
};
#[cfg(feature = "server")]
use metrics::RpcMetricsLayer;
#[cfg(feature = "server")]
use std::{
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
#[cfg(feature = "server")]
use tokio::sync::{mpsc, oneshot};
#[cfg(feature = "server")]
use tower_http::cors::{AllowOrigin, CorsLayer};
mod apis;
#[cfg(feature = "server")]
mod errors;
#[cfg(feature = "server")]
mod metrics;
#[cfg(feature = "server")]
mod utils;
#[cfg(all(test, feature = "client"))]
mod tests;
pub const DEFAULT_BLOCK_GAS_LIMIT_MULTIPLIER: u64 = 10;
#[cfg(feature = "server")]
#[derive(Debug)]
pub enum RpcEvent {
InjectedTransaction {
transaction: AddressedInjectedTransaction,
response_sender: oneshot::Sender<InjectedTransactionAcceptance>,
},
}
#[cfg(feature = "server")]
#[derive(Debug, Clone)]
pub struct RpcConfig {
pub listen_addr: SocketAddr,
pub cors: Option<Vec<String>>,
pub gas_allowance: u64,
pub chunk_size: usize,
pub with_dev_api: bool,
}
#[cfg(feature = "server")]
pub struct RpcServer {
config: RpcConfig,
db: Database,
}
#[cfg(feature = "server")]
impl RpcServer {
pub fn new(config: RpcConfig, db: Database) -> Self {
Self { config, db }
}
pub const fn port(&self) -> u16 {
self.config.listen_addr.port()
}
pub async fn run_server(self) -> Result<(ServerHandle, RpcService)> {
let (rpc_sender, rpc_receiver) = mpsc::unbounded_channel();
let cors_layer = self.cors_layer()?;
let http_middleware = tower::ServiceBuilder::new().layer(cors_layer);
let rpc_middleware = RpcServiceBuilder::new().layer(RpcMetricsLayer);
let processor = Processor::with_config(
ProcessorConfig {
chunk_size: self.config.chunk_size,
},
self.db.clone(),
)?
.overlaid();
let server_apis = RpcServerApis {
code: CodeApi::new(self.db.clone()),
block: BlockApi::new(self.db.clone()),
program: ProgramApi::new(self.db.clone(), processor, self.config.gas_allowance),
injected: InjectedApi::new(self.db.clone(), rpc_sender),
dev: self
.config
.with_dev_api
.then(|| DevApi::new(self.db.clone())),
};
let injected_api = server_apis.injected.clone();
let server_handle = Server::builder()
.set_http_middleware(http_middleware)
.set_rpc_middleware(rpc_middleware)
.enable_ws_ping(PingConfig::default())
.build(self.config.listen_addr)
.await?
.start(server_apis.into_module());
Ok((server_handle, RpcService::new(rpc_receiver, injected_api)))
}
fn cors_layer(&self) -> Result<CorsLayer> {
let Some(cors) = self.config.cors.clone() else {
return Ok(CorsLayer::permissive());
};
let mut list = Vec::new();
for origin in cors {
list.push(HeaderValue::from_str(&origin)?)
}
Ok(CorsLayer::new().allow_origin(AllowOrigin::list(list)))
}
}
#[cfg(feature = "server")]
pub struct RpcService {
receiver: mpsc::UnboundedReceiver<RpcEvent>,
injected_api: InjectedApi,
}
#[cfg(feature = "server")]
impl RpcService {
pub fn new(receiver: mpsc::UnboundedReceiver<RpcEvent>, injected_api: InjectedApi) -> Self {
Self {
receiver,
injected_api,
}
}
pub fn receive_computed_promise(&self, promise: Promise) {
self.injected_api.on_computed_promise(promise);
}
pub fn receive_tx_receipt(&self, receipt: SignedCompactTxReceipt) {
self.injected_api.on_tx_receipt(receipt);
}
}
#[cfg(feature = "server")]
impl Stream for RpcService {
type Item = RpcEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}
#[cfg(feature = "server")]
impl FusedStream for RpcService {
fn is_terminated(&self) -> bool {
self.receiver.is_closed()
}
}
#[cfg(feature = "server")]
struct RpcServerApis {
pub block: BlockApi,
pub code: CodeApi,
pub injected: InjectedApi,
pub program: ProgramApi,
pub dev: Option<DevApi>,
}
#[cfg(feature = "server")]
impl RpcServerApis {
pub fn into_module(self) -> jsonrpsee::server::RpcModule<()> {
let mut module = JsonrpcModule::new(());
module
.merge(BlockServer::into_rpc(self.block))
.expect("No conflicts");
module
.merge(CodeServer::into_rpc(self.code))
.expect("No conflicts");
module
.merge(InjectedServer::into_rpc(self.injected))
.expect("No conflicts");
module
.merge(ProgramServer::into_rpc(self.program))
.expect("No conflicts");
if let Some(dev) = self.dev {
module
.merge(DevServer::into_rpc(dev))
.expect("No conflicts");
}
module
}
}