use super::client::{Client, Error};
use crate::{jsonrpc, method::Method, types::Empty};
use futures::{
channel::{mpsc, oneshot},
stream::StreamExt as _,
};
use serde::Serialize;
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use tokio_stream::StreamExt as _;
pub struct Buffered {
client: Arc<Client>,
calls: mpsc::UnboundedSender<Call>,
}
struct Call {
request: jsonrpc::Request,
response: oneshot::Sender<Result<jsonrpc::Response, Error>>,
}
impl Client {
pub fn buffered(self, config: Configuration) -> Buffered {
let client = Arc::new(self);
let (calls, receiver) = mpsc::unbounded();
tokio::task::spawn(Buffered::background_worker(
client.clone(),
receiver,
config,
));
Buffered { client, calls }
}
}
impl Buffered {
pub fn client(&self) -> &Client {
&self.client
}
async fn background_worker(
client: Arc<Client>,
calls: mpsc::UnboundedReceiver<Call>,
config: Configuration,
) {
calls
.chunks_timeout(config.max_size, config.delay)
.for_each_concurrent(
config.max_concurrent_requests.map(NonZeroUsize::get),
|mut chunk| {
let client = client.clone();
async move {
match chunk.len() {
0 => (),
1 => {
let call = chunk.remove(0);
let response = client.roundtrip(call.request).await;
let _ = call.response.send(response);
}
n => {
let (requests, channels): (Vec<_>, Vec<_>) = chunk
.into_iter()
.map(|call| (call.request, call.response))
.unzip();
let responses = client
.roundtrip::<_, Vec<jsonrpc::Response>>(requests)
.await
.map(|responses| {
responses.into_iter().map(Ok).collect::<Vec<_>>()
})
.unwrap_or_else(|err| {
(0..n).map(|_| Err(err.duplicate())).collect()
});
for (channel, response) in channels.into_iter().zip(responses) {
let _ = channel.send(response);
}
}
}
}
},
)
.await;
}
async fn roundtrip(&self, request: jsonrpc::Request) -> Result<jsonrpc::Response, Error> {
async {
let (sender, receiver) = oneshot::channel();
self.calls
.unbounded_send(Call {
request,
response: sender,
})
.ok()?;
receiver.await.ok()
}
.await
.expect("background worker unexpectedly stopped")
}
pub async fn call<M>(&self, method: M, params: M::Params) -> Result<M::Result, Error>
where
M: Method + Serialize,
{
jsonrpc::call_async(method, params, |request| self.roundtrip(request)).await
}
pub async fn call_np<M>(&self, method: M) -> Result<M::Result, Error>
where
M: Method<Params = Empty> + Serialize,
{
jsonrpc::call_async(method, Empty, |request| self.roundtrip(request)).await
}
}
pub struct Configuration {
pub max_concurrent_requests: Option<NonZeroUsize>,
pub max_size: usize,
pub delay: Duration,
}
impl Default for Configuration {
fn default() -> Self {
Self {
max_concurrent_requests: NonZeroUsize::new(1),
max_size: 20,
delay: Duration::default(),
}
}
}