use codec::Decode;
use futures::FutureExt;
use jsonrpsee::{
core::{async_trait, RpcResult},
Extensions, PendingSubscriptionSink,
};
pub use sc_rpc_api::statement::{error::Error, StatementApiServer};
use sp_core::Bytes;
use sp_statement_store::{
OptimizedTopicFilter, StatementEvent, StatementSource, SubmitResult, TopicFilter,
};
use std::sync::Arc;
const LOG_TARGET: &str = "statement-store-rpc";
const MAX_CHUNK_BYTES_LIMIT: usize = 4 * 1024 * 1024;
use crate::{
utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
SubscriptionTaskExecutor,
};
#[cfg(test)]
mod tests;
async fn send_in_chunks(
existing_statements: Vec<Vec<u8>>,
subscription_sender: async_channel::Sender<StatementEvent>,
) {
let mut iter = existing_statements.into_iter().peekable();
loop {
let mut chunk = Vec::<Bytes>::new();
let mut chunk_json_size = 0usize;
while let Some(statement) = iter.peek() {
let json_size_estimate = statement.len() * 2;
if json_size_estimate > MAX_CHUNK_BYTES_LIMIT {
iter.next();
continue;
}
if chunk_json_size + json_size_estimate > MAX_CHUNK_BYTES_LIMIT {
break;
}
let Some(statement) = iter.next() else { break };
chunk_json_size += json_size_estimate;
chunk.push(statement.into());
}
if chunk.is_empty() {
break;
}
let remaining = iter.len();
if let Err(e) = subscription_sender
.send(StatementEvent::NewStatements {
statements: chunk,
remaining: Some(remaining as u32),
})
.await
{
log::warn!(
target: LOG_TARGET,
"Failed to send existing statement in subscription: {:?}", e
);
break;
}
}
}
pub trait StatementStoreApi:
sp_statement_store::StatementStore + sc_statement_store::StatementStoreSubscriptionApi
{
}
impl<T> StatementStoreApi for T where
T: sp_statement_store::StatementStore + sc_statement_store::StatementStoreSubscriptionApi
{
}
pub struct StatementStore {
store: Arc<dyn StatementStoreApi>,
executor: SubscriptionTaskExecutor,
}
impl StatementStore {
pub fn new(store: Arc<dyn StatementStoreApi>, executor: SubscriptionTaskExecutor) -> Self {
StatementStore { store, executor }
}
}
#[async_trait]
impl StatementApiServer for StatementStore {
fn submit(&self, encoded: Bytes) -> RpcResult<SubmitResult> {
let statement = Decode::decode(&mut &*encoded)
.map_err(|e| Error::StatementStore(format!("Error decoding statement: {:?}", e)))?;
match self.store.submit(statement, StatementSource::Local) {
SubmitResult::InternalError(e) => Err(Error::StatementStore(e.to_string()).into()),
result => Ok(result),
}
}
fn subscribe_statement(
&self,
pending: PendingSubscriptionSink,
_ext: &Extensions,
topic_filter: TopicFilter,
) {
let optimized_topic_filter: OptimizedTopicFilter = topic_filter.into();
let (existing_statements, subscription_sender, subscription_stream) =
match self.store.subscribe_statement(optimized_topic_filter) {
Ok(res) => res,
Err(err) => {
spawn_subscription_task(
&self.executor,
pending.reject(Error::StatementStore(format!(
"Error collecting existing statements: {:?}",
err
))),
);
return;
},
};
spawn_subscription_task(
&self.executor,
PendingSubscription::from(pending)
.pipe_from_stream(subscription_stream, BoundedVecDeque::new(128)),
);
self.executor.spawn(
"statement-store-rpc-send",
Some("rpc"),
send_in_chunks(existing_statements, subscription_sender).boxed(),
)
}
}