mod background;
use crate::{
network_service, platform::PlatformRef, runtime_service, sync_service, transactions_service,
};
use alloc::{format, string::String, sync::Arc, vec::Vec};
use core::num::NonZeroU32;
use futures_util::future;
use smoldot::{
chain_spec,
json_rpc::{self, requests_subscriptions},
libp2p::PeerId,
};
pub struct Config {
pub log_name: String,
pub max_pending_requests: NonZeroU32,
pub max_subscriptions: u32,
pub max_parallel_requests: NonZeroU32,
pub max_parallel_subscription_updates: NonZeroU32,
}
pub fn service(config: Config) -> (Frontend, ServicePrototype) {
let mut requests_subscriptions =
requests_subscriptions::RequestsSubscriptions::new(requests_subscriptions::Config {
max_clients: 1,
max_requests_per_client: config.max_pending_requests,
max_subscriptions_per_client: config.max_subscriptions,
});
let client_id = requests_subscriptions.add_client_mut().unwrap(); let requests_subscriptions = Arc::new(requests_subscriptions);
let log_target = format!("json-rpc-{}", config.log_name);
let num_handles =
config.max_parallel_requests.get() + config.max_parallel_subscription_updates.get() + 1;
let mut background_aborts = Vec::with_capacity(usize::try_from(num_handles).unwrap());
let mut background_abort_registrations = Vec::with_capacity(background_aborts.capacity());
for _ in 0..num_handles {
let (abort, reg) = future::AbortHandle::new_pair();
background_aborts.push(abort);
background_abort_registrations.push(reg);
}
let frontend = Frontend {
log_target: log_target.clone(),
requests_subscriptions: requests_subscriptions.clone(),
client_id,
background_aborts: Arc::from(background_aborts),
};
let prototype = ServicePrototype {
background_abort_registrations,
log_target,
requests_subscriptions,
max_parallel_requests: config.max_parallel_requests,
max_parallel_subscription_updates: config.max_parallel_subscription_updates,
};
(frontend, prototype)
}
#[derive(Clone)]
pub struct Frontend {
requests_subscriptions:
Arc<requests_subscriptions::RequestsSubscriptions<background::SubscriptionMessage>>,
client_id: requests_subscriptions::ClientId,
log_target: String,
background_aborts: Arc<[future::AbortHandle]>,
}
impl Frontend {
pub fn queue_rpc_request(&self, json_rpc_request: String) -> Result<(), HandleRpcError> {
if let Err(error) = json_rpc::parse::parse_call(&json_rpc_request) {
log::warn!(
target: &self.log_target,
"Refused malformed JSON-RPC request: {}", error
);
return Err(HandleRpcError::MalformedJsonRpc(error));
}
log::debug!(
target: &self.log_target,
"PendingRequestsQueue <= {}",
crate::util::truncated_str(
json_rpc_request.chars().filter(|c| !c.is_control()),
100,
)
);
match self
.requests_subscriptions
.try_queue_client_request(&self.client_id, json_rpc_request)
{
Ok(()) => Ok(()),
Err(err) => {
log::warn!(
target: &self.log_target,
"Request denied due to JSON-RPC service being overloaded. This will likely \
cause the JSON-RPC client to malfunction."
);
Err(HandleRpcError::Overloaded {
json_rpc_request: err.request,
})
}
}
}
pub async fn next_json_rpc_response(&self) -> String {
let message = self
.requests_subscriptions
.next_response(&self.client_id)
.await;
log::debug!(
target: &self.log_target,
"JSON-RPC <= {}",
crate::util::truncated_str(
message.chars().filter(|c| !c.is_control()),
100,
)
);
message
}
}
impl Drop for Frontend {
fn drop(&mut self) {
if let Some(background_aborts) = Arc::get_mut(&mut self.background_aborts) {
for background_abort in background_aborts {
background_abort.abort();
}
}
}
}
pub struct ServicePrototype {
requests_subscriptions:
Arc<requests_subscriptions::RequestsSubscriptions<background::SubscriptionMessage>>,
log_target: String,
max_parallel_requests: NonZeroU32,
max_parallel_subscription_updates: NonZeroU32,
background_abort_registrations: Vec<future::AbortRegistration>,
}
pub struct StartConfig<'a, TPlat: PlatformRef> {
pub platform: TPlat,
pub network_service: (Arc<network_service::NetworkService<TPlat>>, usize),
pub sync_service: Arc<sync_service::SyncService<TPlat>>,
pub transactions_service: Arc<transactions_service::TransactionsService<TPlat>>,
pub runtime_service: Arc<runtime_service::RuntimeService<TPlat>>,
pub chain_spec: &'a chain_spec::ChainSpec,
pub peer_id: &'a PeerId,
pub system_name: String,
pub system_version: String,
pub genesis_block_hash: [u8; 32],
pub genesis_block_state_root: [u8; 32],
}
impl ServicePrototype {
pub fn start<TPlat: PlatformRef>(self, config: StartConfig<'_, TPlat>) {
background::start(
self.log_target.clone(),
self.requests_subscriptions.clone(),
config,
self.max_parallel_requests,
self.max_parallel_subscription_updates,
self.background_abort_registrations,
)
}
}
#[derive(Debug, derive_more::Display)]
pub enum HandleRpcError {
#[display(
fmt = "The JSON-RPC service cannot process this request, as it is already too busy."
)]
Overloaded {
json_rpc_request: String,
},
#[display(fmt = "The request isn't a valid JSON-RPC request: {_0}")]
MalformedJsonRpc(json_rpc::parse::ParseError),
}
impl HandleRpcError {
pub fn into_json_rpc_error(self) -> Option<String> {
let json_rpc_request = match self {
HandleRpcError::Overloaded { json_rpc_request } => json_rpc_request,
HandleRpcError::MalformedJsonRpc(_) => return None,
};
match json_rpc::parse::parse_call(&json_rpc_request) {
Ok(json_rpc::parse::Call {
id_json: Some(id), ..
}) => Some(json_rpc::parse::build_error_response(
id,
json_rpc::parse::ErrorResponse::ServerError(-32000, "Too busy"),
None,
)),
Ok(json_rpc::parse::Call { id_json: None, .. }) | Err(_) => None,
}
}
}