use crate::protocol::message::{
ClientMessage, ConfigMsg, DownloadEvent, ErrorMsg, GetConfig, HubMsg, Ping, SendMsg,
SendResponse, ServerMessage, ServiceQueryMsg, ServiceQueryResultMsg, SetConfigMsg, StreamEvent,
StreamMsg, SubscribeDownloads, SubscribeTasks, client_message, download_event, server_message,
stream_event, task_event,
};
use anyhow::Result;
use futures_core::Stream;
use futures_util::StreamExt;
pub trait Client: Send {
fn request(
&mut self,
msg: ClientMessage,
) -> impl std::future::Future<Output = Result<ServerMessage>> + Send;
fn request_stream(
&mut self,
msg: ClientMessage,
) -> impl Stream<Item = Result<ServerMessage>> + Send + '_;
fn send(
&mut self,
req: SendMsg,
) -> impl std::future::Future<Output = Result<SendResponse>> + Send {
async move { SendResponse::try_from(self.request(req.into()).await?) }
}
fn stream(
&mut self,
req: StreamMsg,
) -> impl Stream<Item = Result<stream_event::Event>> + Send + '_ {
self.request_stream(req.into())
.take_while(|r| {
std::future::ready(!matches!(
r,
Ok(ServerMessage {
msg: Some(server_message::Msg::Stream(StreamEvent {
event: Some(stream_event::Event::End(_))
}))
})
))
})
.map(|r| r.and_then(stream_event::Event::try_from))
}
fn hub(
&mut self,
req: HubMsg,
) -> impl Stream<Item = Result<download_event::Event>> + Send + '_ {
self.request_stream(ClientMessage {
msg: Some(client_message::Msg::Hub(req)),
})
.take_while(|r| {
std::future::ready(!matches!(
r,
Ok(ServerMessage {
msg: Some(server_message::Msg::Download(DownloadEvent {
event: Some(download_event::Event::Completed(_))
}))
})
))
})
.map(|r| r.and_then(download_event::Event::try_from))
}
fn ping(&mut self) -> impl std::future::Future<Output = Result<()>> + Send {
async move {
match self
.request(ClientMessage {
msg: Some(client_message::Msg::Ping(Ping {})),
})
.await?
{
ServerMessage {
msg: Some(server_message::Msg::Pong(_)),
} => Ok(()),
ServerMessage {
msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
} => {
anyhow::bail!("server error ({code}): {message}")
}
other => anyhow::bail!("unexpected response: {other:?}"),
}
}
}
fn subscribe_tasks(&mut self) -> impl Stream<Item = Result<task_event::Event>> + Send + '_ {
self.request_stream(ClientMessage {
msg: Some(client_message::Msg::SubscribeTasks(SubscribeTasks {})),
})
.map(|r| r.and_then(task_event::Event::try_from))
}
fn subscribe_downloads(
&mut self,
) -> impl Stream<Item = Result<download_event::Event>> + Send + '_ {
self.request_stream(ClientMessage {
msg: Some(client_message::Msg::SubscribeDownloads(
SubscribeDownloads {},
)),
})
.map(|r| r.and_then(download_event::Event::try_from))
}
fn get_config(&mut self) -> impl std::future::Future<Output = Result<String>> + Send {
async move {
match self
.request(ClientMessage {
msg: Some(client_message::Msg::GetConfig(GetConfig {})),
})
.await?
{
ServerMessage {
msg: Some(server_message::Msg::Config(ConfigMsg { config })),
} => Ok(config),
ServerMessage {
msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
} => {
anyhow::bail!("server error ({code}): {message}")
}
other => anyhow::bail!("unexpected response: {other:?}"),
}
}
}
fn set_config(
&mut self,
config: String,
) -> impl std::future::Future<Output = Result<()>> + Send {
async move {
match self
.request(ClientMessage {
msg: Some(client_message::Msg::SetConfig(SetConfigMsg { config })),
})
.await?
{
ServerMessage {
msg: Some(server_message::Msg::Pong(_)),
} => Ok(()),
ServerMessage {
msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
} => {
anyhow::bail!("server error ({code}): {message}")
}
other => anyhow::bail!("unexpected response: {other:?}"),
}
}
}
fn service_query(
&mut self,
service: String,
query: String,
) -> impl std::future::Future<Output = Result<String>> + Send {
async move {
match self
.request(ClientMessage {
msg: Some(client_message::Msg::ServiceQuery(ServiceQueryMsg {
service,
query,
})),
})
.await?
{
ServerMessage {
msg:
Some(server_message::Msg::ServiceQueryResult(ServiceQueryResultMsg { result })),
} => Ok(result),
ServerMessage {
msg: Some(server_message::Msg::Error(ErrorMsg { code, message })),
} => {
anyhow::bail!("server error ({code}): {message}")
}
other => anyhow::bail!("unexpected response: {other:?}"),
}
}
}
}