use std::io::{self, BufReader};
use std::time::Duration;
use bytes::BytesMut;
use super::errors::Error;
use super::frame::{Frame, ReqBuf};
use super::stream_ext::StreamExt;
pub struct StreamClient<S: StreamExt> {
id: u64,
stream: BufReader<S>,
}
impl<S: StreamExt> StreamClient<S> {
pub fn new(stream: S) -> Self {
StreamClient {
id: 0,
stream: BufReader::with_capacity(1024 * 32, stream),
}
}
}
impl<S: StreamExt> StreamClient<S> {
pub fn set_timeout(&mut self, timeout: Duration) -> Result<(), io::Error> {
self.stream.get_mut().set_read_timeout(timeout)
}
}
impl<S: StreamExt> StreamClient<S> {
pub fn call_service(&mut self, req: ReqBuf) -> Result<Frame, Error> {
let id = self.id;
self.id += 1;
info!("request id = {}", id);
self.stream.get_mut().write_all(&(req.finish(id)))?;
let mut buf = BytesMut::with_capacity(1024 * 32);
loop {
let rsp_frame = Frame::decode_from(&mut self.stream, &mut buf)
.map_err(|e| Error::ClientDeserialize(e.to_string()))?;
if rsp_frame.id == id {
info!("get response id = {}", id);
return Ok(rsp_frame);
}
}
}
}