use std::fmt;
use std::marker::PhantomData;
use std::sync::Arc;
use bytes::{BufMut, Bytes, BytesMut};
use rand::prelude::*;
use serde::{Deserialize, Serialize};
use futures::stream::StreamExt;
use crate::error::*;
use crate::stream::*;
use crate::util::*;
pub type RequestPriority = u8;
pub const PRIO_HIGH: RequestPriority = 0x20;
pub const PRIO_NORMAL: RequestPriority = 0x40;
pub const PRIO_BACKGROUND: RequestPriority = 0x80;
pub const PRIO_PRIMARY: RequestPriority = 0x00;
pub const PRIO_SECONDARY: RequestPriority = 0x01;
#[derive(Clone, Copy, Serialize, Deserialize, Debug)]
pub struct OrderTag(pub(crate) u64, pub(crate) u64);
#[derive(Clone, Copy)]
pub struct OrderTagStream(u64);
impl OrderTag {
pub fn stream() -> OrderTagStream {
OrderTagStream(thread_rng().gen())
}
}
impl OrderTagStream {
pub fn order(&self, order: u64) -> OrderTag {
OrderTag(self.0, order)
}
}
pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static;
}
pub struct Req<M: Message> {
pub(crate) msg: Arc<M>,
pub(crate) msg_ser: Option<Bytes>,
pub(crate) stream: AttachedStream,
pub(crate) order_tag: Option<OrderTag>,
}
impl<M: Message> Req<M> {
pub fn new(v: M) -> Result<Self, Error> {
Ok(v.into_req()?)
}
pub fn with_stream_from_buffer(self, b: Bytes) -> Self {
Self {
stream: AttachedStream::Fixed(b),
..self
}
}
pub fn with_stream(self, b: ByteStream) -> Self {
Self {
stream: AttachedStream::Stream(b),
..self
}
}
pub fn with_order_tag(self, order_tag: OrderTag) -> Self {
Self {
order_tag: Some(order_tag),
..self
}
}
pub fn msg(&self) -> &M {
&self.msg
}
pub fn take_stream(&mut self) -> Option<ByteStream> {
std::mem::replace(&mut self.stream, AttachedStream::None).into_stream()
}
pub(crate) fn into_enc(
self,
prio: RequestPriority,
path: Bytes,
telemetry_id: Bytes,
) -> ReqEnc {
ReqEnc {
prio,
path,
telemetry_id,
msg: self.msg_ser.unwrap(),
stream: self.stream.into_stream(),
order_tag: self.order_tag,
}
}
pub(crate) fn from_enc(enc: ReqEnc) -> Result<Self, rmp_serde::decode::Error> {
let msg = rmp_serde::decode::from_slice(&enc.msg)?;
Ok(Req {
msg: Arc::new(msg),
msg_ser: Some(enc.msg),
stream: enc
.stream
.map(AttachedStream::Stream)
.unwrap_or(AttachedStream::None),
order_tag: enc.order_tag,
})
}
}
pub trait IntoReq<M: Message> {
fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error>;
fn into_req_local(self) -> Req<M>;
}
impl<M: Message> IntoReq<M> for M {
fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error> {
let msg_ser = rmp_to_vec_all_named(&self)?;
Ok(Req {
msg: Arc::new(self),
msg_ser: Some(Bytes::from(msg_ser)),
stream: AttachedStream::None,
order_tag: None,
})
}
fn into_req_local(self) -> Req<M> {
Req {
msg: Arc::new(self),
msg_ser: None,
stream: AttachedStream::None,
order_tag: None,
}
}
}
impl<M: Message> IntoReq<M> for Req<M> {
fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error> {
Ok(self)
}
fn into_req_local(self) -> Req<M> {
self
}
}
impl<M: Message> Clone for Req<M> {
fn clone(&self) -> Self {
let stream = match &self.stream {
AttachedStream::None => AttachedStream::None,
AttachedStream::Fixed(b) => AttachedStream::Fixed(b.clone()),
AttachedStream::Stream(_) => {
panic!("Cannot clone a Req<_> with a non-buffer attached stream")
}
};
Self {
msg: self.msg.clone(),
msg_ser: self.msg_ser.clone(),
stream,
order_tag: self.order_tag,
}
}
}
impl<M> fmt::Debug for Req<M>
where
M: Message + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "Req[{:?}", self.msg)?;
match &self.stream {
AttachedStream::None => write!(f, "]"),
AttachedStream::Fixed(b) => write!(f, "; stream=buf:{}]", b.len()),
AttachedStream::Stream(_) => write!(f, "; stream]"),
}
}
}
pub struct Resp<M: Message> {
pub(crate) _phantom: PhantomData<M>,
pub(crate) msg: M::Response,
pub(crate) stream: AttachedStream,
pub(crate) order_tag: Option<OrderTag>,
}
impl<M: Message> Resp<M> {
pub fn new(v: M::Response) -> Self {
Resp {
_phantom: Default::default(),
msg: v,
stream: AttachedStream::None,
order_tag: None,
}
}
pub fn with_stream_from_buffer(self, b: Bytes) -> Self {
Self {
stream: AttachedStream::Fixed(b),
..self
}
}
pub fn with_stream(self, b: ByteStream) -> Self {
Self {
stream: AttachedStream::Stream(b),
..self
}
}
pub fn with_order_tag(self, order_tag: OrderTag) -> Self {
Self {
order_tag: Some(order_tag),
..self
}
}
pub fn msg(&self) -> &M::Response {
&self.msg
}
pub fn into_msg(self) -> M::Response {
self.msg
}
pub fn into_parts(self) -> (M::Response, Option<ByteStream>) {
(self.msg, self.stream.into_stream())
}
pub(crate) fn into_enc(self) -> Result<RespEnc, rmp_serde::encode::Error> {
Ok(RespEnc {
msg: rmp_to_vec_all_named(&self.msg)?.into(),
stream: self.stream.into_stream(),
order_tag: self.order_tag,
})
}
pub(crate) fn from_enc(enc: RespEnc) -> Result<Self, Error> {
let msg = rmp_serde::decode::from_slice(&enc.msg)?;
Ok(Self {
_phantom: Default::default(),
msg,
stream: enc
.stream
.map(AttachedStream::Stream)
.unwrap_or(AttachedStream::None),
order_tag: enc.order_tag,
})
}
}
impl<M> fmt::Debug for Resp<M>
where
M: Message,
<M as Message>::Response: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "Resp[{:?}", self.msg)?;
match &self.stream {
AttachedStream::None => write!(f, "]"),
AttachedStream::Fixed(b) => write!(f, "; stream=buf:{}]", b.len()),
AttachedStream::Stream(_) => write!(f, "; stream]"),
}
}
}
pub(crate) enum AttachedStream {
None,
Fixed(Bytes),
Stream(ByteStream),
}
impl AttachedStream {
pub fn into_stream(self) -> Option<ByteStream> {
match self {
AttachedStream::None => None,
AttachedStream::Fixed(b) => Some(Box::pin(futures::stream::once(async move { Ok(b) }))),
AttachedStream::Stream(s) => Some(s),
}
}
}
pub(crate) struct ReqEnc {
pub(crate) prio: RequestPriority,
pub(crate) path: Bytes,
pub(crate) telemetry_id: Bytes,
pub(crate) msg: Bytes,
pub(crate) stream: Option<ByteStream>,
pub(crate) order_tag: Option<OrderTag>,
}
impl ReqEnc {
pub(crate) fn encode(self) -> (ByteStream, Option<OrderTag>) {
let mut buf = BytesMut::with_capacity(
self.path.len() + self.telemetry_id.len() + self.msg.len() + 16,
);
buf.put_u8(self.prio);
buf.put_u8(self.path.len() as u8);
buf.put(self.path);
buf.put_u8(self.telemetry_id.len() as u8);
buf.put(&self.telemetry_id[..]);
buf.put_u32(self.msg.len() as u32);
let header = buf.freeze();
let res_stream: ByteStream = if let Some(stream) = self.stream {
Box::pin(futures::stream::iter([Ok(header), Ok(self.msg)]).chain(stream))
} else {
Box::pin(futures::stream::iter([Ok(header), Ok(self.msg)]))
};
(res_stream, self.order_tag)
}
pub(crate) async fn decode(stream: ByteStream) -> Result<Self, Error> {
Self::decode_aux(stream)
.await
.map_err(read_exact_error_to_error)
}
async fn decode_aux(stream: ByteStream) -> Result<Self, ReadExactError> {
let mut reader = ByteStreamReader::new(stream);
let prio = reader.read_u8().await?;
let path_len = reader.read_u8().await?;
let path = reader.read_exact(path_len as usize).await?;
let telemetry_id_len = reader.read_u8().await?;
let telemetry_id = reader.read_exact(telemetry_id_len as usize).await?;
let msg_len = reader.read_u32().await?;
let msg = reader.read_exact(msg_len as usize).await?;
Ok(Self {
prio,
path,
telemetry_id,
msg,
stream: Some(reader.into_stream()),
order_tag: None,
})
}
}
pub(crate) struct RespEnc {
msg: Bytes,
stream: Option<ByteStream>,
order_tag: Option<OrderTag>,
}
impl RespEnc {
pub(crate) fn encode(resp: Result<Self, Error>) -> (ByteStream, Option<OrderTag>) {
match resp {
Ok(Self {
msg,
stream,
order_tag,
}) => {
let mut buf = BytesMut::with_capacity(4);
buf.put_u32(msg.len() as u32);
let header = buf.freeze();
let res_stream: ByteStream = if let Some(stream) = stream {
Box::pin(futures::stream::iter([Ok(header), Ok(msg)]).chain(stream))
} else {
Box::pin(futures::stream::iter([Ok(header), Ok(msg)]))
};
(res_stream, order_tag)
}
Err(err) => {
let err = std::io::Error::new(
std::io::ErrorKind::Other,
format!("netapp error: {}", err),
);
(
Box::pin(futures::stream::once(async move { Err(err) })),
None,
)
}
}
}
pub(crate) async fn decode(stream: ByteStream) -> Result<Self, Error> {
Self::decode_aux(stream)
.await
.map_err(read_exact_error_to_error)
}
async fn decode_aux(stream: ByteStream) -> Result<Self, ReadExactError> {
let mut reader = ByteStreamReader::new(stream);
let msg_len = reader.read_u32().await?;
let msg = reader.read_exact(msg_len as usize).await?;
reader.fill_buffer().await;
Ok(Self {
msg,
stream: Some(reader.into_stream()),
order_tag: None,
})
}
}
fn read_exact_error_to_error(e: ReadExactError) -> Error {
match e {
ReadExactError::Stream(err) => Error::Remote(err.kind(), err.to_string()),
ReadExactError::UnexpectedEos => Error::Framing,
}
}