use crate::error::{RpcError, RpcResult};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, fmt};
pub use crate::document::{
decode_document, decode_number, encode_document, encode_document_ref, encode_number, Document,
DocumentRef, Number,
};
#[derive(Debug)]
pub struct Message<'m> {
pub method: &'m str,
pub arg: Cow<'m, [u8]>,
}
#[derive(Default, Debug, Clone)]
pub struct Context {
pub actor: Option<String>,
pub span: Option<String>,
}
#[derive(Default, Debug)]
pub struct SendOpts {
pub idempotent: bool,
pub read_only: bool,
}
impl SendOpts {
#[must_use]
pub fn idempotent(mut self, val: bool) -> SendOpts {
self.idempotent = val;
self
}
#[must_use]
pub fn read_only(mut self, val: bool) -> SendOpts {
self.read_only = val;
self
}
}
#[async_trait]
pub trait Transport: Send + Sync + Clone {
async fn send(
&self,
ctx: &Context,
req: Message<'_>,
opts: Option<SendOpts>,
) -> Result<Vec<u8>, RpcError>;
fn set_timeout(&self, interval: std::time::Duration);
}
pub fn deserialize<'de, T: Deserialize<'de>>(buf: &'de [u8]) -> Result<T, RpcError> {
rmp_serde::from_slice(buf).map_err(|e| RpcError::Deser(e.to_string()))
}
pub fn serialize<T: Serialize>(data: &T) -> Result<Vec<u8>, RpcError> {
rmp_serde::to_vec_named(data).map_err(|e| RpcError::Ser(e.to_string()))
}
#[async_trait]
pub trait MessageDispatch {
async fn dispatch(&self, ctx: &Context, message: Message<'_>) -> Result<Vec<u8>, RpcError>;
}
#[derive(Clone, Eq, PartialEq)]
pub enum MessageFormat {
Msgpack,
Cbor,
Empty,
Unknown,
}
impl fmt::Display for MessageFormat {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
MessageFormat::Msgpack => "msgpack",
MessageFormat::Cbor => "cbor",
MessageFormat::Empty => "empty",
MessageFormat::Unknown => "unknown",
})
}
}
impl fmt::Debug for MessageFormat {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.to_string())
}
}
impl MessageFormat {
pub fn write_header<W: std::io::Write>(&self, mut buf: W) -> std::io::Result<usize> {
match self {
MessageFormat::Cbor => buf.write(&[127u8]), MessageFormat::Msgpack => buf.write(&[193u8]), MessageFormat::Empty => Ok(0),
MessageFormat::Unknown => Ok(0),
}
}
}
pub fn message_format(data: &[u8]) -> (MessageFormat, usize) {
match data.len() {
0 => (MessageFormat::Empty, 0),
1 => (MessageFormat::Msgpack, 0), _ => {
match data[0] {
0x7f => (MessageFormat::Cbor, 1), 0xc1 => (MessageFormat::Msgpack, 1), 0x00..=0x7e => (MessageFormat::Unknown, 0), 0xc0 => (MessageFormat::Unknown, 0), _ => (MessageFormat::Msgpack, 0), }
}
}
}
pub type CborDecodeFn<T> = dyn Fn(&mut crate::cbor::Decoder<'_>) -> RpcResult<T>;
pub type CborDecodeFnLt<'de, T> = dyn Fn(&mut crate::cbor::Decoder<'de>) -> RpcResult<T>;
pub fn decode<T: serde::de::DeserializeOwned>(
buf: &[u8],
cbor_dec: &CborDecodeFn<T>,
) -> RpcResult<T> {
let value = match message_format(buf) {
(MessageFormat::Cbor, offset) => {
let d = &mut crate::cbor::Decoder::new(&buf[offset..]);
cbor_dec(d)?
}
(MessageFormat::Msgpack, offset) => deserialize(&buf[offset..])
.map_err(|e| RpcError::Deser(format!("decoding '{e}': {{}}")))?,
_ => return Err(RpcError::Deser("invalid encoding".to_string())),
};
Ok(value)
}
pub fn decode_borrowed<'de, T>(
buf: &'de [u8],
cbor_dec: &'de CborDecodeFnLt<'de, T>,
) -> RpcResult<T> {
match message_format(buf) {
(MessageFormat::Cbor, offset) => {
let d = &mut crate::cbor::Decoder::new(&buf[offset..]);
Ok(cbor_dec(d)?)
}
_ => Err(RpcError::Deser("invalid encoding (borrowed)".to_string())),
}
}
pub fn decode_owned<'vin, T: Clone>(
buf: &'vin [u8],
cbor_dec: &CborDecodeFnLt<'vin, T>,
) -> RpcResult<T> {
match message_format(buf) {
(MessageFormat::Cbor, offset) => {
let d = &mut crate::cbor::Decoder::new(&buf[offset..]);
let out: T = cbor_dec(d)?;
Ok(out)
}
_ => Err(RpcError::Deser("invalid encoding (borrowed)".to_string())),
}
}
pub struct AnySender<T: Transport> {
transport: T,
}
impl<T: Transport> AnySender<T> {
pub fn new(transport: T) -> Self {
Self { transport }
}
}
impl<T: Transport> AnySender<T> {
#[inline]
async fn send_raw<'s, 'ctx, 'msg>(
&'s self,
ctx: &'ctx Context,
msg: Message<'msg>,
) -> RpcResult<Vec<u8>> {
self.transport.send(ctx, msg, None).await
}
pub async fn send<In: Serialize, Out: serde::de::DeserializeOwned>(
&self,
ctx: &Context,
method: &str,
arg: &In,
) -> RpcResult<Out> {
let mut buf = Vec::new();
MessageFormat::Cbor.write_header(&mut buf).unwrap();
minicbor_ser::to_writer(arg, &mut buf).map_err(|e| RpcError::Ser(e.to_string()))?;
let resp = self.send_raw(ctx, Message { method, arg: Cow::Borrowed(&buf) }).await?;
let result: Out =
minicbor_ser::from_slice(&resp).map_err(|e| RpcError::Deser(e.to_string()))?;
Ok(result)
}
pub async fn send_cbor<'de, In: minicbor::Encode<()>, Out: crate::cbor::MDecodeOwned<()>>(
&mut self,
ctx: &Context,
method: &str,
arg: &In,
) -> RpcResult<Out> {
let mut buf = Vec::new();
MessageFormat::Cbor.write_header(&mut buf).unwrap();
crate::minicbor::encode(arg, &mut buf).map_err(|e| RpcError::Ser(e.to_string()))?;
let resp = self.send_raw(ctx, Message { method, arg: Cow::Borrowed(&buf) }).await?;
let result: Out =
crate::minicbor::decode(&resp).map_err(|e| RpcError::Deser(e.to_string()))?;
Ok(result)
}
}
pub type Unit = ();