mod background;
use crate::{
log, network_service, platform::PlatformRef, runtime_service, sync_service,
transactions_service,
};
use alloc::{
borrow::Cow,
boxed::Box,
format,
string::{String, ToString as _},
sync::Arc,
};
use core::{num::NonZero, pin::Pin};
use futures_lite::StreamExt as _;
pub struct Config<TPlat: PlatformRef> {
pub platform: TPlat,
pub log_name: String,
#[allow(unused)]
pub max_pending_requests: NonZero<u32>,
#[allow(unused)]
pub max_subscriptions: u32,
pub network_service: Arc<network_service::NetworkServiceChain<TPlat>>,
pub sync_service: Arc<sync_service::SyncService<TPlat>>,
pub transactions_service: Arc<transactions_service::TransactionsService<TPlat>>,
pub runtime_service: Arc<runtime_service::RuntimeService<TPlat>>,
pub chain_name: String,
pub chain_ty: String,
pub chain_properties_json: String,
pub chain_is_live: bool,
pub system_name: String,
pub system_version: String,
pub genesis_block_hash: [u8; 32],
}
pub fn service<TPlat: PlatformRef>(config: Config<TPlat>) -> Frontend<TPlat> {
let log_target = format!("json-rpc-{}", config.log_name);
let (requests_tx, requests_rx) = async_channel::unbounded(); let (responses_tx, responses_rx) = async_channel::bounded(16);
let frontend = Frontend {
platform: config.platform.clone(),
log_target: log_target.clone(),
responses_rx: Arc::new(async_lock::Mutex::new(Box::pin(responses_rx))),
requests_tx,
};
let platform = config.platform.clone();
platform.spawn_task(
Cow::Owned(log_target.clone()),
background::run(
log_target,
background::Config {
platform: config.platform,
network_service: config.network_service,
sync_service: config.sync_service,
transactions_service: config.transactions_service,
runtime_service: config.runtime_service,
chain_name: config.chain_name,
chain_ty: config.chain_ty,
chain_properties_json: config.chain_properties_json,
chain_is_live: config.chain_is_live,
system_name: config.system_name,
system_version: config.system_version,
genesis_block_hash: config.genesis_block_hash,
},
requests_rx,
responses_tx,
),
);
frontend
}
#[derive(Clone)]
pub struct Frontend<TPlat> {
platform: TPlat,
requests_tx: async_channel::Sender<String>,
responses_rx: Arc<async_lock::Mutex<Pin<Box<async_channel::Receiver<String>>>>>,
log_target: String,
}
impl<TPlat: PlatformRef> Frontend<TPlat> {
pub fn queue_rpc_request(&self, json_rpc_request: String) -> Result<(), HandleRpcError> {
let log_friendly_request =
crate::util::truncated_str(json_rpc_request.chars().filter(|c| !c.is_control()), 250)
.to_string();
match self.requests_tx.try_send(json_rpc_request) {
Ok(()) => {
log!(
&self.platform,
Debug,
&self.log_target,
"json-rpc-request-queued",
request = log_friendly_request
);
Ok(())
}
Err(err) => Err(HandleRpcError::TooManyPendingRequests {
json_rpc_request: err.into_inner(),
}),
}
}
pub async fn next_json_rpc_response(&self) -> String {
let message = match self.responses_rx.lock().await.next().await {
Some(m) => m,
None => unreachable!(),
};
log!(
&self.platform,
Debug,
&self.log_target,
"json-rpc-response-yielded",
response =
crate::util::truncated_str(message.chars().filter(|c| !c.is_control()), 250,)
);
message
}
}
#[derive(Debug, derive_more::Display, derive_more::Error)]
pub enum HandleRpcError {
#[display(
"The JSON-RPC service cannot process this request, as too many requests are already being processed."
)]
TooManyPendingRequests {
json_rpc_request: String,
},
}