use std::sync::Arc;
use std::sync::Mutex;
use bytes::Buf;
use bytes::Bytes;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use crate::client::CallOptions;
use crate::client::Invoke;
use crate::client::InvokeOnce;
use crate::client::RecvStream;
use crate::client::ResponseStreamItem;
use crate::client::SendOptions;
use crate::client::SendStream;
use crate::core::RecvMessage;
use crate::core::RequestHeaders;
use crate::core::SendMessage;
pub(crate) struct NopStream;
impl SendStream for NopStream {
async fn send(&mut self, _item: &dyn SendMessage, _options: SendOptions) -> Result<(), ()> {
Ok(())
}
}
impl RecvStream for NopStream {
async fn recv(&mut self, _msg: &mut dyn RecvMessage) -> ResponseStreamItem {
ResponseStreamItem::StreamClosed
}
}
#[derive(Clone)]
pub(crate) struct NopInvoker;
impl Invoke for NopInvoker {
type SendStream = NopStream;
type RecvStream = NopStream;
async fn invoke(
&self,
_headers: RequestHeaders,
_options: CallOptions,
) -> (Self::SendStream, Self::RecvStream) {
(NopStream, NopStream)
}
}
pub(crate) struct NopOnceInvoker;
impl InvokeOnce for NopOnceInvoker {
type SendStream = NopStream;
type RecvStream = NopStream;
async fn invoke_once(
self,
_headers: RequestHeaders,
_options: CallOptions,
) -> (Self::SendStream, Self::RecvStream) {
(NopStream, NopStream)
}
}
pub(crate) struct NopRecvMessage;
impl RecvMessage for NopRecvMessage {
fn decode(&mut self, _data: &mut dyn Buf) -> Result<(), String> {
Ok(())
}
}
pub(crate) struct ByteRecvMsg {
pub data: Option<Bytes>,
}
impl ByteRecvMsg {
pub fn new() -> Self {
Self { data: None }
}
}
impl RecvMessage for ByteRecvMsg {
fn decode(&mut self, data: &mut dyn Buf) -> Result<(), String> {
self.data = Some(data.copy_to_bytes(data.remaining()));
Ok(())
}
}
pub(crate) struct ByteSendMsg<'a> {
pub data: &'a Bytes,
}
impl<'a> ByteSendMsg<'a> {
pub fn new(data: &'a Bytes) -> Self {
Self { data }
}
}
impl<'a> SendMessage for ByteSendMsg<'a> {
fn encode(&self) -> Result<Box<dyn Buf + Send + Sync>, String> {
Ok(Box::new(self.data.clone()))
}
}
#[derive(Clone)]
pub(crate) struct MockInvoker {
pub req_headers: Arc<Mutex<Option<RequestHeaders>>>,
pub resp_tx: broadcast::Sender<ResponseStreamItem>,
pub req_tx: mpsc::Sender<(Bytes, SendOptions)>,
}
pub(crate) struct MockInvokerController {
pub resp_tx: broadcast::Sender<ResponseStreamItem>,
pub req_rx: mpsc::Receiver<(Bytes, SendOptions)>,
}
impl MockInvoker {
pub fn new() -> (Self, MockInvokerController) {
let (resp_tx, _) = broadcast::channel(100);
let (req_tx, req_rx) = mpsc::channel(100);
(
MockInvoker {
req_headers: Arc::new(Mutex::new(None)),
resp_tx: resp_tx.clone(),
req_tx,
},
MockInvokerController { req_rx, resp_tx },
)
}
}
impl MockInvokerController {
pub async fn recv_req(&mut self) -> (Bytes, SendOptions) {
self.req_rx.recv().await.unwrap()
}
pub async fn send_resp(&mut self, item: ResponseStreamItem) {
self.resp_tx.send(item).unwrap();
}
}
impl Invoke for MockInvoker {
type SendStream = MockSendStream;
type RecvStream = MockRecvStream;
async fn invoke(
&self,
headers: RequestHeaders,
_options: CallOptions,
) -> (Self::SendStream, Self::RecvStream) {
*self.req_headers.lock().unwrap() = Some(headers);
(
MockSendStream(self.req_tx.clone()),
MockRecvStream(self.resp_tx.subscribe()),
)
}
}
pub(crate) struct MockSendStream(pub mpsc::Sender<(Bytes, SendOptions)>);
impl SendStream for MockSendStream {
async fn send(&mut self, item: &dyn SendMessage, options: SendOptions) -> Result<(), ()> {
let mut data = item.encode().unwrap();
self.0
.send((data.copy_to_bytes(data.remaining()), options))
.await
.map_err(|_| ())
}
}
pub(crate) struct MockRecvStream(pub broadcast::Receiver<ResponseStreamItem>);
impl RecvStream for MockRecvStream {
async fn recv(&mut self, _msg: &mut dyn RecvMessage) -> ResponseStreamItem {
match self.0.recv().await {
Ok(item) => item,
Err(_) => ResponseStreamItem::StreamClosed,
}
}
}