use std::time::Duration;
use jsonrpsee_types::{ErrorCode, ErrorObject, Id, InvalidRequest, Response, ResponsePayload};
use serde_json::value::RawValue;
use tokio::sync::mpsc;
use super::{DisconnectError, SendTimeoutError, TrySendError};
#[derive(Clone, Debug)]
pub struct MethodSink {
tx: mpsc::Sender<Box<RawValue>>,
max_response_size: u32,
}
impl MethodSink {
pub fn new(tx: mpsc::Sender<Box<RawValue>>) -> Self {
MethodSink { tx, max_response_size: u32::MAX }
}
pub fn new_with_limit(tx: mpsc::Sender<Box<RawValue>>, max_response_size: u32) -> Self {
MethodSink { tx, max_response_size }
}
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}
pub async fn closed(&self) {
self.tx.closed().await
}
pub const fn max_response_size(&self) -> u32 {
self.max_response_size
}
pub fn try_send(&mut self, msg: Box<RawValue>) -> Result<(), TrySendError> {
self.tx.try_send(msg).map_err(Into::into)
}
pub async fn send(&self, msg: Box<RawValue>) -> Result<(), DisconnectError> {
self.tx.send(msg).await.map_err(Into::into)
}
pub async fn send_error<'a>(&self, id: Id<'a>, err: ErrorObject<'a>) -> Result<(), DisconnectError> {
let payload = ResponsePayload::<()>::error_borrowed(err);
let json = serde_json::value::to_raw_value(&Response::new(payload, id)).expect("valid JSON; qed");
self.send(json).await
}
pub async fn send_timeout(&self, msg: Box<RawValue>, timeout: Duration) -> Result<(), SendTimeoutError> {
self.tx.send_timeout(msg, timeout).await.map_err(Into::into)
}
pub fn capacity(&self) -> usize {
self.tx.capacity()
}
pub fn max_capacity(&self) -> usize {
self.tx.max_capacity()
}
pub async fn has_capacity(&self) -> Result<(), DisconnectError> {
match self.tx.reserve().await {
Ok(_) => Ok(()),
Err(_) => Err(DisconnectError(RawValue::NULL.to_owned().into())),
}
}
}
pub fn prepare_error(data: &[u8]) -> (Id<'_>, ErrorCode) {
match serde_json::from_slice::<InvalidRequest>(data) {
Ok(InvalidRequest { id }) => (id, ErrorCode::InvalidRequest),
Err(_) => (Id::Null, ErrorCode::ParseError),
}
}