use jsonrpcmsg::Params;
use crate::{
ConnectionTo, Dispatch, HandleDispatchFrom, Handled, JsonRpcNotification, JsonRpcRequest,
JsonRpcResponse, Responder, ResponseRouter, UntypedMessage,
role::{HasPeer, Role, handle_incoming_dispatch},
util::json_cast,
};
#[must_use]
#[derive(Debug)]
pub struct MatchDispatch {
state: Result<Handled<Dispatch>, crate::Error>,
}
impl MatchDispatch {
pub fn new(message: Dispatch) -> Self {
Self {
state: Ok(Handled::No {
message,
retry: false,
}),
}
}
pub fn from_handled(state: Result<Handled<Dispatch>, crate::Error>) -> Self {
Self { state }
}
pub async fn if_request<Req: JsonRpcRequest, H>(
mut self,
op: impl AsyncFnOnce(Req, Responder<Req::Response>) -> Result<H, crate::Error>,
) -> Self
where
H: crate::IntoHandled<(Req, Responder<Req::Response>)>,
{
if let Ok(Handled::No {
message: dispatch,
retry,
}) = self.state
{
self.state = match dispatch {
Dispatch::Request(untyped_request, untyped_responder) => {
if Req::matches_method(untyped_request.method()) {
match Req::parse_message(untyped_request.method(), untyped_request.params())
{
Ok(typed_request) => {
let typed_responder = untyped_responder.cast();
match op(typed_request, typed_responder).await {
Ok(result) => match result.into_handled() {
Handled::Yes => Ok(Handled::Yes),
Handled::No {
message: (request, responder),
retry: request_retry,
} => match request.to_untyped_message() {
Ok(untyped) => Ok(Handled::No {
message: Dispatch::Request(
untyped,
responder.erase_to_json(),
),
retry: retry | request_retry,
}),
Err(err) => Err(err),
},
},
Err(err) => Err(err),
}
}
Err(err) => Err(err),
}
} else {
Ok(Handled::No {
message: Dispatch::Request(untyped_request, untyped_responder),
retry,
})
}
}
Dispatch::Notification(_) | Dispatch::Response(_, _) => Ok(Handled::No {
message: dispatch,
retry,
}),
};
}
self
}
pub async fn if_notification<N: JsonRpcNotification, H>(
mut self,
op: impl AsyncFnOnce(N) -> Result<H, crate::Error>,
) -> Self
where
H: crate::IntoHandled<N>,
{
if let Ok(Handled::No {
message: dispatch,
retry,
}) = self.state
{
self.state = match dispatch {
Dispatch::Notification(untyped_notification) => {
if N::matches_method(untyped_notification.method()) {
match N::parse_message(
untyped_notification.method(),
untyped_notification.params(),
) {
Ok(typed_notification) => match op(typed_notification).await {
Ok(result) => match result.into_handled() {
Handled::Yes => Ok(Handled::Yes),
Handled::No {
message: notification,
retry: notification_retry,
} => match notification.to_untyped_message() {
Ok(untyped) => Ok(Handled::No {
message: Dispatch::Notification(untyped),
retry: retry | notification_retry,
}),
Err(err) => Err(err),
},
},
Err(err) => Err(err),
},
Err(err) => Err(err),
}
} else {
Ok(Handled::No {
message: Dispatch::Notification(untyped_notification),
retry,
})
}
}
Dispatch::Request(_, _) | Dispatch::Response(_, _) => Ok(Handled::No {
message: dispatch,
retry,
}),
};
}
self
}
pub async fn if_message<R: JsonRpcRequest, N: JsonRpcNotification, H>(
mut self,
op: impl AsyncFnOnce(Dispatch<R, N>) -> Result<H, crate::Error>,
) -> Self
where
H: crate::IntoHandled<Dispatch<R, N>>,
{
if let Ok(Handled::No {
message: dispatch,
retry,
}) = self.state
{
self.state = match dispatch.into_typed_dispatch::<R, N>() {
Ok(Ok(typed_dispatch)) => match op(typed_dispatch).await {
Ok(result) => match result.into_handled() {
Handled::Yes => Ok(Handled::Yes),
Handled::No {
message: typed_dispatch,
retry: message_retry,
} => {
let untyped = match typed_dispatch {
Dispatch::Request(request, responder) => {
match request.to_untyped_message() {
Ok(untyped) => {
Dispatch::Request(untyped, responder.erase_to_json())
}
Err(err) => return Self { state: Err(err) },
}
}
Dispatch::Notification(notification) => {
match notification.to_untyped_message() {
Ok(untyped) => Dispatch::Notification(untyped),
Err(err) => return Self { state: Err(err) },
}
}
Dispatch::Response(result, router) => {
let method = router.method();
let untyped_result = match result {
Ok(response) => match response.into_json(method) {
Ok(json) => Ok(json),
Err(err) => return Self { state: Err(err) },
},
Err(err) => Err(err),
};
Dispatch::Response(untyped_result, router.erase_to_json())
}
};
Ok(Handled::No {
message: untyped,
retry: retry | message_retry,
})
}
},
Err(err) => Err(err),
},
Ok(Err(dispatch)) => Ok(Handled::No {
message: dispatch,
retry,
}),
Err(err) => Err(err),
};
}
self
}
pub async fn if_response_to<Req: JsonRpcRequest, H>(
mut self,
op: impl AsyncFnOnce(
Result<Req::Response, crate::Error>,
ResponseRouter<Req::Response>,
) -> Result<H, crate::Error>,
) -> Self
where
H: crate::IntoHandled<(
Result<Req::Response, crate::Error>,
ResponseRouter<Req::Response>,
)>,
{
if let Ok(Handled::No {
message: dispatch,
retry,
}) = self.state
{
self.state = match dispatch {
Dispatch::Response(result, router) => {
if Req::matches_method(router.method()) {
let typed_router: ResponseRouter<Req::Response> = router.cast();
let typed_result = match result {
Ok(value) => Req::Response::from_value(typed_router.method(), value),
Err(err) => Err(err),
};
match op(typed_result, typed_router).await {
Ok(handler_result) => match handler_result.into_handled() {
Handled::Yes => Ok(Handled::Yes),
Handled::No {
message: (result, router),
retry: response_retry,
} => {
let untyped_result = match result {
Ok(response) => response.into_json(router.method()),
Err(err) => Err(err),
};
Ok(Handled::No {
message: Dispatch::Response(
untyped_result,
router.erase_to_json(),
),
retry: retry | response_retry,
})
}
},
Err(err) => Err(err),
}
} else {
Ok(Handled::No {
message: Dispatch::Response(result, router),
retry,
})
}
}
Dispatch::Request(_, _) | Dispatch::Notification(_) => Ok(Handled::No {
message: dispatch,
retry,
}),
};
}
self
}
pub async fn if_ok_response_to<Req: JsonRpcRequest, H>(
self,
op: impl AsyncFnOnce(Req::Response, ResponseRouter<Req::Response>) -> Result<H, crate::Error>,
) -> Self
where
H: crate::IntoHandled<(Req::Response, ResponseRouter<Req::Response>)>,
{
self.if_response_to::<Req, _>(async move |result, router| match result {
Ok(response) => {
let handler_result = op(response, router).await?;
match handler_result.into_handled() {
Handled::Yes => Ok(Handled::Yes),
Handled::No {
message: (resp, router),
retry,
} => Ok(Handled::No {
message: (Ok(resp), router),
retry,
}),
}
}
Err(err) => Ok(Handled::No {
message: (Err(err), router),
retry: false,
}),
})
.await
}
pub fn done(self) -> Result<Handled<Dispatch>, crate::Error> {
self.state
}
pub async fn otherwise(
self,
op: impl AsyncFnOnce(Dispatch) -> Result<(), crate::Error>,
) -> Result<(), crate::Error> {
match self.state {
Ok(Handled::Yes) => Ok(()),
Ok(Handled::No { message, retry: _ }) => op(message).await,
Err(err) => Err(err),
}
}
pub fn otherwise_ignore(self) -> Result<(), crate::Error> {
match self.state {
Ok(_) => Ok(()),
Err(err) => Err(err),
}
}
}
#[must_use]
#[derive(Debug)]
pub struct MatchDispatchFrom<Counterpart: Role> {
state: Result<Handled<Dispatch>, crate::Error>,
connection: ConnectionTo<Counterpart>,
}
impl<Counterpart: Role> MatchDispatchFrom<Counterpart> {
pub fn new(message: Dispatch, cx: &ConnectionTo<Counterpart>) -> Self {
Self {
state: Ok(Handled::No {
message,
retry: false,
}),
connection: cx.clone(),
}
}
pub async fn if_request<Req: JsonRpcRequest, H>(
self,
op: impl AsyncFnOnce(Req, Responder<Req::Response>) -> Result<H, crate::Error>,
) -> Self
where
Counterpart: HasPeer<Counterpart>,
H: crate::IntoHandled<(Req, Responder<Req::Response>)>,
{
let counterpart = self.connection.counterpart();
self.if_request_from(counterpart, op).await
}
pub async fn if_request_from<Peer: Role, Req: JsonRpcRequest, H>(
mut self,
peer: Peer,
op: impl AsyncFnOnce(Req, Responder<Req::Response>) -> Result<H, crate::Error>,
) -> Self
where
Counterpart: HasPeer<Peer>,
H: crate::IntoHandled<(Req, Responder<Req::Response>)>,
{
if let Ok(Handled::No { message, retry: _ }) = self.state {
self.state = handle_incoming_dispatch(
self.connection.counterpart(),
peer,
message,
self.connection.clone(),
async |dispatch, _connection| {
MatchDispatch::new(dispatch).if_request(op).await.done()
},
)
.await;
}
self
}
pub async fn if_notification<N: JsonRpcNotification, H>(
self,
op: impl AsyncFnOnce(N) -> Result<H, crate::Error>,
) -> Self
where
Counterpart: HasPeer<Counterpart>,
H: crate::IntoHandled<N>,
{
let counterpart = self.connection.counterpart();
self.if_notification_from(counterpart, op).await
}
pub async fn if_notification_from<Peer: Role, N: JsonRpcNotification, H>(
mut self,
peer: Peer,
op: impl AsyncFnOnce(N) -> Result<H, crate::Error>,
) -> Self
where
Counterpart: HasPeer<Peer>,
H: crate::IntoHandled<N>,
{
if let Ok(Handled::No { message, retry: _ }) = self.state {
self.state = handle_incoming_dispatch(
self.connection.counterpart(),
peer,
message,
self.connection.clone(),
async |dispatch, _connection| {
MatchDispatch::new(dispatch)
.if_notification(op)
.await
.done()
},
)
.await;
}
self
}
pub async fn if_message_from<Peer: Role, Req: JsonRpcRequest, N: JsonRpcNotification, H>(
mut self,
peer: Peer,
op: impl AsyncFnOnce(Dispatch<Req, N>) -> Result<H, crate::Error>,
) -> Self
where
Counterpart: HasPeer<Peer>,
H: crate::IntoHandled<Dispatch<Req, N>>,
{
if let Ok(Handled::No { message, retry: _ }) = self.state {
self.state = handle_incoming_dispatch(
self.connection.counterpart(),
peer,
message,
self.connection.clone(),
async |dispatch, _connection| {
MatchDispatch::new(dispatch).if_message(op).await.done()
},
)
.await;
}
self
}
pub async fn if_response_to<Req: JsonRpcRequest, H>(
mut self,
op: impl AsyncFnOnce(
Result<Req::Response, crate::Error>,
ResponseRouter<Req::Response>,
) -> Result<H, crate::Error>,
) -> Self
where
H: crate::IntoHandled<(
Result<Req::Response, crate::Error>,
ResponseRouter<Req::Response>,
)>,
{
if let Ok(Handled::No { message, retry: _ }) = self.state {
self.state = MatchDispatch::new(message)
.if_response_to::<Req, H>(op)
.await
.done();
}
self
}
pub async fn if_ok_response_to<Req: JsonRpcRequest, H>(
self,
op: impl AsyncFnOnce(Req::Response, ResponseRouter<Req::Response>) -> Result<H, crate::Error>,
) -> Self
where
Counterpart: HasPeer<Counterpart>,
H: crate::IntoHandled<(Req::Response, ResponseRouter<Req::Response>)>,
{
let counterpart = self.connection.counterpart();
self.if_ok_response_to_from::<Req, Counterpart, H>(counterpart, op)
.await
}
pub async fn if_response_to_from<Req: JsonRpcRequest, Peer: Role, H>(
mut self,
peer: Peer,
op: impl AsyncFnOnce(
Result<Req::Response, crate::Error>,
ResponseRouter<Req::Response>,
) -> Result<H, crate::Error>,
) -> Self
where
Counterpart: HasPeer<Peer>,
H: crate::IntoHandled<(
Result<Req::Response, crate::Error>,
ResponseRouter<Req::Response>,
)>,
{
if let Ok(Handled::No { message, retry: _ }) = self.state {
self.state = handle_incoming_dispatch(
self.connection.counterpart(),
peer,
message,
self.connection.clone(),
async |dispatch, _connection| {
MatchDispatch::new(dispatch)
.if_response_to::<Req, H>(op)
.await
.done()
},
)
.await;
}
self
}
pub async fn if_ok_response_to_from<Req: JsonRpcRequest, Peer: Role, H>(
self,
peer: Peer,
op: impl AsyncFnOnce(Req::Response, ResponseRouter<Req::Response>) -> Result<H, crate::Error>,
) -> Self
where
Counterpart: HasPeer<Peer>,
H: crate::IntoHandled<(Req::Response, ResponseRouter<Req::Response>)>,
{
self.if_response_to_from::<Req, _, _>(peer, async move |result, router| match result {
Ok(response) => {
let handler_result = op(response, router).await?;
match handler_result.into_handled() {
Handled::Yes => Ok(Handled::Yes),
Handled::No {
message: (resp, router),
retry,
} => Ok(Handled::No {
message: (Ok(resp), router),
retry,
}),
}
}
Err(err) => Ok(Handled::No {
message: (Err(err), router),
retry: false,
}),
})
.await
}
pub fn done(self) -> Result<Handled<Dispatch>, crate::Error> {
match self.state {
Ok(Handled::Yes) => Ok(Handled::Yes),
Ok(Handled::No { message, retry }) => Ok(Handled::No { message, retry }),
Err(err) => Err(err),
}
}
pub async fn otherwise(
self,
op: impl AsyncFnOnce(Dispatch) -> Result<(), crate::Error>,
) -> Result<(), crate::Error> {
match self.state {
Ok(Handled::Yes) => Ok(()),
Ok(Handled::No { message, retry: _ }) => op(message).await,
Err(err) => Err(err),
}
}
pub async fn otherwise_delegate(
self,
mut handler: impl HandleDispatchFrom<Counterpart>,
) -> Result<Handled<Dispatch>, crate::Error> {
match self.state? {
Handled::Yes => Ok(Handled::Yes),
Handled::No {
message,
retry: outer_retry,
} => match handler
.handle_dispatch_from(message, self.connection.clone())
.await?
{
Handled::Yes => Ok(Handled::Yes),
Handled::No {
message,
retry: inner_retry,
} => Ok(Handled::No {
message,
retry: inner_retry | outer_retry,
}),
},
}
}
}
#[must_use]
#[derive(Debug)]
pub struct TypeNotification<R: Role> {
cx: ConnectionTo<R>,
state: Option<TypeNotificationState>,
}
#[derive(Debug)]
enum TypeNotificationState {
Unhandled(String, Option<Params>),
Handled(Result<(), crate::Error>),
}
impl<R: Role> TypeNotification<R> {
pub fn new(request: UntypedMessage, cx: &ConnectionTo<R>) -> Self {
let UntypedMessage { method, params } = request;
let params: Option<Params> = json_cast(params).expect("valid params");
Self {
cx: cx.clone(),
state: Some(TypeNotificationState::Unhandled(method, params)),
}
}
pub async fn handle_if<N: JsonRpcNotification>(
mut self,
op: impl AsyncFnOnce(N) -> Result<(), crate::Error>,
) -> Self {
self.state = Some(match self.state.take().expect("valid state") {
TypeNotificationState::Unhandled(method, params) => {
if N::matches_method(&method) {
match N::parse_message(&method, ¶ms) {
Ok(request) => TypeNotificationState::Handled(op(request).await),
Err(err) => {
TypeNotificationState::Handled(self.cx.send_error_notification(err))
}
}
} else {
TypeNotificationState::Unhandled(method, params)
}
}
TypeNotificationState::Handled(err) => TypeNotificationState::Handled(err),
});
self
}
pub async fn otherwise(
mut self,
op: impl AsyncFnOnce(UntypedMessage) -> Result<(), crate::Error>,
) -> Result<(), crate::Error> {
match self.state.take().expect("valid state") {
TypeNotificationState::Unhandled(method, params) => {
match UntypedMessage::new(&method, params) {
Ok(m) => op(m).await,
Err(err) => self.cx.send_error_notification(err),
}
}
TypeNotificationState::Handled(r) => r,
}
}
}