use crate::commands::auth::AuthCommand;
use crate::commands::builder::CommandBuilder;
use crate::commands::hello::{HelloCommand, HelloResponse};
use crate::commands::Command;
use crate::network::buffer::Network;
use crate::network::future::Future;
use crate::network::handler::{ConnectionError, Credentials};
use crate::network::protocol::{Protocol, Resp3};
use crate::network::timeout::{Timeout, TimeoutError};
use crate::subscription::client::{Error, Subscription};
use crate::subscription::messages::ToPushMessage;
use alloc::string::String;
use bytes::Bytes;
use core::fmt::{Debug, Formatter};
use embedded_nal::TcpClientStack;
use embedded_time::duration::Microseconds;
use embedded_time::Clock;
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum CommandErrors {
Timeout,
EncodingCommandFailed,
ProtocolViolation,
InvalidFuture,
TcpError,
TimerError,
CommandResponseViolation,
ErrorResponse(String),
MemoryFull,
}
pub struct Client<'a, N: TcpClientStack, C: Clock, P: Protocol>
where
HelloCommand: Command<<P as Protocol>::FrameType>,
{
pub(crate) network: Network<'a, N, P>,
pub(crate) clock: Option<&'a C>,
pub(crate) timeout_duration: Microseconds,
pub(crate) hello_response: Option<&'a <HelloCommand as Command<<P as Protocol>::FrameType>>::Response>,
}
impl<'a, N: TcpClientStack, C: Clock, P: Protocol> Client<'a, N, C, P>
where
AuthCommand: Command<<P as Protocol>::FrameType>,
HelloCommand: Command<<P as Protocol>::FrameType>,
{
pub fn send<Cmd>(&'a self, command: Cmd) -> Result<Future<N, C, P, Cmd>, CommandErrors>
where
Cmd: Command<P::FrameType>,
{
let id = self.network.send(command.encode())?;
Ok(Future::new(
id,
command,
self.network.get_protocol(),
&self.network,
Timeout::new(self.clock, self.timeout_duration)?,
))
}
pub fn subscribe<const L: usize>(
self,
channels: [Bytes; L],
) -> Result<Subscription<'a, N, C, P, L>, Error>
where
<P as Protocol>::FrameType: ToPushMessage,
<P as Protocol>::FrameType: From<CommandBuilder>,
{
Subscription::new(self, channels).subscribe()
}
pub(crate) fn auth(&'a self, credentials: Option<Credentials>) -> Result<(), ConnectionError> {
if credentials.is_some() {
self.send(AuthCommand::from(credentials.as_ref().unwrap()))
.map_err(auth_error)?
.wait()
.map_err(auth_error)?;
}
Ok(())
}
pub(crate) fn init(
&'a self,
credentials: Option<Credentials>,
) -> Result<Option<<HelloCommand as Command<<P as Protocol>::FrameType>>::Response>, ConnectionError>
{
self.auth(credentials)?;
if self.network.get_protocol().requires_hello() {
return Ok(Some(
self.send(HelloCommand {}).map_err(hello_error)?.wait().map_err(hello_error)?,
));
}
Ok(None)
}
pub fn close(&self) {
if !self.network.remaining_dropped_futures() {
return;
}
let timer = match Timeout::new(self.clock, self.timeout_duration) {
Ok(timer) => timer,
Err(_) => {
return;
}
};
while self.network.remaining_dropped_futures() && !timer.expired().unwrap_or(true) {
self.network.handle_dropped_futures();
}
}
}
impl<'a, N: TcpClientStack, C: Clock> Client<'a, N, C, Resp3> {
pub fn get_hello_response(&self) -> &HelloResponse {
self.hello_response.as_ref().unwrap()
}
}
impl From<TimeoutError> for CommandErrors {
fn from(_: TimeoutError) -> Self {
CommandErrors::TimerError
}
}
fn auth_error(error: CommandErrors) -> ConnectionError {
ConnectionError::AuthenticationError(error)
}
#[allow(dead_code)]
fn hello_error(error: CommandErrors) -> ConnectionError {
ConnectionError::ProtocolSwitchError(error)
}
impl<'a, N: TcpClientStack, C: Clock, P: Protocol> Debug for Client<'a, N, C, P>
where
HelloCommand: Command<<P as Protocol>::FrameType>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Client")
.field("network", &self.network)
.field("timeout_duration", &self.timeout_duration)
.finish()
}
}