use std::{
error::Error,
pin::Pin,
};
use crate::coding::{Rframe, Tframe, WireFormat};
use bytes::{BufMut, Bytes, BytesMut};
use futures::prelude::*;
pub trait Message: WireFormat + Send + Sync {}
pub trait JetStreamService<Req: Message, Resp: Message>:
Send + Sync + Sized
{
#[allow(clippy::type_complexity)]
fn call(
&mut self,
req: Req,
) -> Pin<
Box<
dyn Future<Output = Result<Resp, Box<dyn Error + Send + Sync>>>
+ Send,
>,
>;
}
pub trait NinePService:
JetStreamService<Tframe, Rframe> + Send + Sync + Clone + Clone
{
}
#[derive(Debug, Clone, Copy)]
pub struct NinePServiceImpl<S: NinePService> {
inner: S,
}
impl<S: NinePService> NinePServiceImpl<S> {
pub fn new(inner: S) -> Self {
NinePServiceImpl { inner }
}
}
impl<S: NinePService> JetStreamService<Tframe, Rframe> for NinePServiceImpl<S> {
fn call(
&mut self,
req: Tframe,
) -> Pin<
Box<
dyn Future<Output = Result<Rframe, Box<dyn Error + Send + Sync>>>
+ Send,
>,
> {
self.inner.call(req)
}
}
pub trait ConvertWireFormat: WireFormat {
fn to_bytes(&self) -> Bytes;
fn from_bytes(buf: &mut Bytes) -> Result<Self, std::io::Error>;
}
impl<T> ConvertWireFormat for T
where
T: WireFormat,
{
fn to_bytes(&self) -> Bytes {
let mut buf = vec![];
let res = self.encode(&mut buf);
if let Err(e) = res {
panic!("Failed to encode: {}", e);
}
let mut bytes = BytesMut::new();
bytes.put_slice(buf.as_slice());
bytes.freeze()
}
fn from_bytes(buf: &mut Bytes) -> Result<Self, std::io::Error> {
let buf = buf.to_vec();
T::decode(&mut buf.as_slice())
}
}