#![deny(missing_docs)]
#![allow(clippy::type_complexity)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#[cfg(feature = "serde1")]
#[doc(hidden)]
pub use serde;
#[cfg(feature = "serde-transport")]
pub use {tokio_serde, tokio_util};
#[cfg(feature = "serde-transport")]
#[cfg_attr(docsrs, doc(cfg(feature = "serde-transport")))]
pub mod serde_transport;
pub mod trace;
#[cfg(feature = "serde1")]
pub use tarpc_plugins::derive_serde;
pub use tarpc_plugins::service;
pub(crate) mod cancellations;
pub mod client;
pub mod context;
pub mod server;
pub mod transport;
pub(crate) mod util;
pub use crate::transport::sealed::Transport;
use std::{any::Any, error::Error, io, sync::Arc, time::Instant};
#[derive(Debug)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub enum ClientMessage<T> {
Request(Request<T>),
Cancel {
#[cfg_attr(feature = "serde1", serde(default))]
trace_context: trace::Context,
request_id: u64,
},
}
#[derive(Clone, Copy, Debug)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct Request<T> {
pub context: context::Context,
pub id: u64,
pub message: T,
}
pub trait RequestName {
fn name(&self) -> &str;
}
impl<Req> RequestName for Arc<Req>
where
Req: RequestName,
{
fn name(&self) -> &str {
self.as_ref().name()
}
}
impl<Req> RequestName for Box<Req>
where
Req: RequestName,
{
fn name(&self) -> &str {
self.as_ref().name()
}
}
impl RequestName for String {
fn name(&self) -> &str {
"string"
}
}
impl RequestName for char {
fn name(&self) -> &str {
"char"
}
}
impl RequestName for () {
fn name(&self) -> &str {
"unit"
}
}
impl RequestName for i32 {
fn name(&self) -> &str {
"i32"
}
}
impl RequestName for u32 {
fn name(&self) -> &str {
"u32"
}
}
impl RequestName for i64 {
fn name(&self) -> &str {
"i64"
}
}
impl RequestName for u64 {
fn name(&self) -> &str {
"u64"
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct Response<T> {
pub request_id: u64,
pub message: Result<T, ServerError>,
}
#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, Hash)]
#[error("{kind:?}: {detail}")]
#[non_exhaustive]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct ServerError {
#[cfg_attr(
feature = "serde1",
serde(serialize_with = "util::serde::serialize_io_error_kind_as_u32")
)]
#[cfg_attr(
feature = "serde1",
serde(deserialize_with = "util::serde::deserialize_io_error_kind_from_u32")
)]
pub kind: io::ErrorKind,
pub detail: String,
}
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum ChannelError<E>
where
E: ?Sized,
{
#[error("could not read from the transport")]
Read(#[source] Arc<E>),
#[error("could not ready the transport for writes")]
Ready(#[source] Arc<E>),
#[error("could not write to the transport")]
Write(#[source] Arc<E>),
#[error("could not flush the transport")]
Flush(#[source] Arc<E>),
#[error("could not close the write end of the transport")]
Close(#[source] Arc<E>),
}
impl<E> Clone for ChannelError<E>
where
E: ?Sized,
{
fn clone(&self) -> Self {
use ChannelError::*;
match self {
Read(e) => Read(e.clone()),
Ready(e) => Ready(e.clone()),
Write(e) => Write(e.clone()),
Flush(e) => Flush(e.clone()),
Close(e) => Close(e.clone()),
}
}
}
impl<E> ChannelError<E>
where
E: Error + Send + Sync + 'static,
{
fn upcast_error(self) -> ChannelError<dyn Error + Send + Sync + 'static> {
use ChannelError::*;
match self {
Read(e) => Read(e),
Ready(e) => Ready(e),
Write(e) => Write(e),
Flush(e) => Flush(e),
Close(e) => Close(e),
}
}
}
impl<E> ChannelError<E>
where
E: Send + Sync + 'static,
{
fn upcast_any(self) -> ChannelError<dyn Any + Send + Sync + 'static> {
use ChannelError::*;
match self {
Read(e) => Read(e),
Ready(e) => Ready(e),
Write(e) => Write(e),
Flush(e) => Flush(e),
Close(e) => Close(e),
}
}
}
impl ChannelError<dyn Any + Send + Sync + 'static> {
fn downcast<E>(self) -> Result<ChannelError<E>, Self>
where
E: Any + Send + Sync,
{
use ChannelError::*;
match self {
Read(e) => e.downcast::<E>().map(Read).map_err(Read),
Ready(e) => e.downcast::<E>().map(Ready).map_err(Ready),
Write(e) => e.downcast::<E>().map(Write).map_err(Write),
Flush(e) => e.downcast::<E>().map(Flush).map_err(Flush),
Close(e) => e.downcast::<E>().map(Close).map_err(Close),
}
}
}
impl ServerError {
pub fn new(kind: io::ErrorKind, detail: String) -> ServerError {
Self { kind, detail }
}
}
impl<T> Request<T> {
pub fn deadline(&self) -> &Instant {
&self.context.deadline
}
}
#[test]
fn test_channel_any_casts() {
use assert_matches::assert_matches;
let any = ChannelError::Read(Arc::new("")).upcast_any();
assert_matches!(any, ChannelError::Read(_));
assert_matches!(any.downcast::<&'static str>(), Ok(ChannelError::Read(_)));
let any = ChannelError::Ready(Arc::new("")).upcast_any();
assert_matches!(any, ChannelError::Ready(_));
assert_matches!(any.downcast::<&'static str>(), Ok(ChannelError::Ready(_)));
let any = ChannelError::Write(Arc::new("")).upcast_any();
assert_matches!(any, ChannelError::Write(_));
assert_matches!(any.downcast::<&'static str>(), Ok(ChannelError::Write(_)));
let any = ChannelError::Flush(Arc::new("")).upcast_any();
assert_matches!(any, ChannelError::Flush(_));
assert_matches!(any.downcast::<&'static str>(), Ok(ChannelError::Flush(_)));
let any = ChannelError::Close(Arc::new("")).upcast_any();
assert_matches!(any, ChannelError::Close(_));
assert_matches!(any.downcast::<&'static str>(), Ok(ChannelError::Close(_)));
}
#[test]
fn test_channel_error_upcast() {
use assert_matches::assert_matches;
use std::fmt;
#[derive(Debug)]
struct E;
impl fmt::Display for E {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "E")
}
}
impl Error for E {}
assert_matches!(
ChannelError::Read(Arc::new(E)).upcast_error(),
ChannelError::Read(_)
);
assert_matches!(
ChannelError::Ready(Arc::new(E)).upcast_error(),
ChannelError::Ready(_)
);
assert_matches!(
ChannelError::Write(Arc::new(E)).upcast_error(),
ChannelError::Write(_)
);
assert_matches!(
ChannelError::Flush(Arc::new(E)).upcast_error(),
ChannelError::Flush(_)
);
assert_matches!(
ChannelError::Close(Arc::new(E)).upcast_error(),
ChannelError::Close(_)
);
}