use thiserror::Error;
use tracing::{debug, warn};
use crate::miniprotocols::common::Point;
use crate::multiplexer;
use super::{Message, State};
#[derive(Error, Debug)]
pub enum ClientError {
#[error("attempted to receive message while agency is ours")]
AgencyIsOurs,
#[error("attempted to send message while agency is theirs")]
AgencyIsTheirs,
#[error("inbound message is not valid for current state")]
InvalidInbound,
#[error("outbound message is not valid for current state")]
InvalidOutbound,
#[error("requested range doesn't contain any blocks")]
NoBlocks,
#[error("error while sending or receiving data through the multiplexer")]
Plexer(multiplexer::Error),
}
pub type Body = Vec<u8>;
pub type Range = (Point, Point);
pub type HasBlocks = Option<()>;
pub struct Client(State, multiplexer::ChannelBuffer);
impl Client {
pub fn new(channel: multiplexer::AgentChannel) -> Self {
Self(State::Idle, multiplexer::ChannelBuffer::new(channel))
}
pub fn state(&self) -> &State {
&self.0
}
pub fn is_done(&self) -> bool {
self.0 == State::Done
}
fn has_agency(&self) -> bool {
match self.state() {
State::Idle => true,
State::Busy => false,
State::Streaming => false,
State::Done => false,
}
}
fn assert_agency_is_ours(&self) -> Result<(), ClientError> {
if !self.has_agency() {
Err(ClientError::AgencyIsTheirs)
} else {
Ok(())
}
}
fn assert_agency_is_theirs(&self) -> Result<(), ClientError> {
if self.has_agency() {
Err(ClientError::AgencyIsOurs)
} else {
Ok(())
}
}
fn assert_outbound_state(&self, msg: &Message) -> Result<(), ClientError> {
match (&self.0, msg) {
(State::Idle, Message::RequestRange { .. }) => Ok(()),
(State::Idle, Message::ClientDone) => Ok(()),
_ => Err(ClientError::InvalidOutbound),
}
}
fn assert_inbound_state(&self, msg: &Message) -> Result<(), ClientError> {
match (&self.0, msg) {
(State::Busy, Message::StartBatch) => Ok(()),
(State::Busy, Message::NoBlocks) => Ok(()),
(State::Streaming, Message::Block { .. }) => Ok(()),
(State::Streaming, Message::BatchDone) => Ok(()),
_ => Err(ClientError::InvalidInbound),
}
}
pub async fn send_message(&mut self, msg: &Message) -> Result<(), ClientError> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1
.send_msg_chunks(msg)
.await
.map_err(ClientError::Plexer)?;
Ok(())
}
pub async fn recv_message(&mut self) -> Result<Message, ClientError> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().await.map_err(ClientError::Plexer)?;
self.assert_inbound_state(&msg)?;
Ok(msg)
}
pub async fn send_request_range(&mut self, range: (Point, Point)) -> Result<(), ClientError> {
let msg = Message::RequestRange { range };
self.send_message(&msg).await?;
self.0 = State::Busy;
Ok(())
}
pub async fn recv_while_busy(&mut self) -> Result<HasBlocks, ClientError> {
match self.recv_message().await? {
Message::StartBatch => {
debug!("batch start");
self.0 = State::Streaming;
Ok(Some(()))
}
Message::NoBlocks => {
warn!("no blocks");
self.0 = State::Idle;
Ok(None)
}
_ => Err(ClientError::InvalidInbound),
}
}
pub async fn request_range(&mut self, range: Range) -> Result<HasBlocks, ClientError> {
self.send_request_range(range).await?;
debug!("range requested");
self.recv_while_busy().await
}
pub async fn recv_while_streaming(&mut self) -> Result<Option<Body>, ClientError> {
debug!("waiting for stream");
match self.recv_message().await? {
Message::Block { body } => Ok(Some(body)),
Message::BatchDone => {
self.0 = State::Idle;
Ok(None)
}
_ => Err(ClientError::InvalidInbound),
}
}
pub async fn fetch_single(&mut self, point: Point) -> Result<Body, ClientError> {
self.request_range((point.clone(), point))
.await?
.ok_or(ClientError::NoBlocks)?;
let body = self
.recv_while_streaming()
.await?
.ok_or(ClientError::InvalidInbound)?;
debug!("body received");
match self.recv_while_streaming().await? {
Some(_) => Err(ClientError::InvalidInbound),
None => Ok(body),
}
}
pub async fn fetch_range(&mut self, range: Range) -> Result<Vec<Body>, ClientError> {
self.request_range(range)
.await?
.ok_or(ClientError::NoBlocks)?;
let mut all = vec![];
while let Some(block) = self.recv_while_streaming().await? {
debug!("body received");
all.push(block);
}
Ok(all)
}
pub async fn send_done(&mut self) -> Result<(), ClientError> {
let msg = Message::ClientDone;
self.send_message(&msg).await?;
self.0 = State::Done;
Ok(())
}
}