mod codec;
#[cfg(test)]
mod tests;
mod types;
mod util;
use futures::{
future::{self, Either, FutureExt},
lock::{Mutex, MutexGuard},
stream::StreamExt,
};
use log::{debug, error, info, trace};
use owning_ref::{OwningRef, OwningRefMut};
use pin_utils::pin_mut;
use rand;
use rand::seq::SliceRandom;
use std::{
collections::HashMap,
io::ErrorKind,
mem,
net::Shutdown,
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
codec::FramedRead,
io::AsyncWriteExt,
net::tcp::{
split::{TcpStreamReadHalf, TcpStreamWriteHalf},
TcpStream,
},
sync::{
mpsc, oneshot,
watch::{self, Sender as WatchSender},
},
timer::{self, Timeout},
};
use uuid::Uuid;
use crate::{
codec::Codec,
error::{Error, Result},
types::{
ClientControl, ConnectionState, ServerMessage, StableMutexGuard, StateTransition,
StateTransitionResult,
},
};
pub use tokio::sync::{mpsc::Receiver as MpscReceiver, watch::Receiver as WatchReceiver};
pub use crate::types::{
error, Address, Authorization, ClientRef, ClientRefMut, ClientState, Connect, Info, Msg,
ProtocolError, Sid, Subject, Subscription,
};
pub type DelayGenerator = Box<dyn Fn(&Client, u64, u64) -> Duration + Send>;
pub fn generate_delay_generator(
connect_series_attempts_before_cool_down: u64,
connect_delay: Duration,
connect_series_delay: Duration,
cool_down: Duration,
) -> DelayGenerator {
Box::new(move |_: &Client, connect_attempts: u64, addresses: u64| {
if connect_attempts % (addresses * connect_series_attempts_before_cool_down) == 0 {
trace!("Using cool down delay {}s", cool_down.as_secs_f32());
cool_down
} else if connect_attempts % addresses == 0 {
trace!(
"Using connect series delay {}s",
connect_series_delay.as_secs_f32()
);
connect_series_delay
} else {
trace!("Using connect delay {}s", connect_delay.as_secs_f32());
connect_delay
}
})
}
#[derive(Clone)]
pub struct Client {
sync: Arc<Mutex<SyncClient>>,
}
impl Client {
pub fn new(addresses: Vec<Address>) -> Self {
Self::with_connect(addresses, Connect::new())
}
pub fn with_connect(addresses: Vec<Address>, connect: Connect) -> Self {
Self {
sync: Arc::new(Mutex::new(SyncClient::with_connect(addresses, connect))),
}
}
pub async fn state(&self) -> ClientState {
self.lock().await.state()
}
pub async fn state_stream(&self) -> WatchReceiver<ClientState> {
self.lock().await.state_stream()
}
pub async fn info(&self) -> Info {
self.lock().await.info()
}
pub async fn connect_mut(&self) -> ClientRefMut<'_, Connect> {
ClientRefMut(
OwningRefMut::new(StableMutexGuard(self.lock().await)).map_mut(|c| c.connect_mut()),
)
}
pub async fn addresses_mut(&self) -> ClientRefMut<'_, [Address]> {
ClientRefMut(
OwningRefMut::new(StableMutexGuard(self.lock().await)).map_mut(|c| c.addresses_mut()),
)
}
pub async fn tcp_connect_timeout(&self) -> Duration {
self.lock().await.tcp_connect_timeout()
}
pub async fn set_tcp_connect_timeout(&self, tcp_connect_timeout: Duration) -> &Self {
self.lock()
.await
.set_tcp_connect_timeout(tcp_connect_timeout);
self
}
pub async fn delay_generator_mut(&self) -> ClientRefMut<'_, DelayGenerator> {
ClientRefMut(
OwningRefMut::new(StableMutexGuard(self.lock().await))
.map_mut(|c| c.delay_generator_mut()),
)
}
pub async fn sids(&self) -> Vec<Sid> {
self.lock()
.await
.subscriptions()
.map(|(sid, _)| *sid)
.collect()
}
pub async fn subscription(&self, sid: Sid) -> Option<ClientRef<'_, Subscription>> {
let client = self.lock().await;
if client.subscriptions.contains_key(&sid) {
Some(ClientRef(
OwningRef::new(StableMutexGuard(client))
.map(|c| c.subscriptions.get(&sid).unwrap()),
))
} else {
None
}
}
pub async fn send_connect(&self) -> Result<()> {
let mut client = self.lock().await;
client.send_connect().await
}
pub async fn connect(&self) {
SyncClient::connect(Self::clone(self)).await
}
pub async fn disconnect(&self) {
SyncClient::disconnect(Arc::clone(&self.sync)).await
}
pub async fn publish(&self, subject: &Subject, payload: &[u8]) -> Result<()> {
self.publish_with_optional_reply(subject, None, payload)
.await
}
pub async fn publish_with_reply(
&self,
subject: &Subject,
reply_to: &Subject,
payload: &[u8],
) -> Result<()> {
self.publish_with_optional_reply(subject, Some(reply_to), payload)
.await
}
pub async fn publish_with_optional_reply(
&self,
subject: &Subject,
reply_to: Option<&Subject>,
payload: &[u8],
) -> Result<()> {
let mut client = self.lock().await;
client
.publish_with_optional_reply(subject, reply_to, payload)
.await
}
pub async fn request(&self, subject: &Subject, payload: &[u8]) -> Result<Msg> {
SyncClient::request(Arc::clone(&self.sync), subject, payload).await
}
pub async fn subscribe(
&self,
subject: &Subject,
buffer: usize,
) -> Result<(Sid, MpscReceiver<Msg>)> {
self.subscribe_with_optional_queue_group(subject, None, buffer)
.await
}
pub async fn subscribe_with_queue_group(
&self,
subject: &Subject,
queue_group: &str,
buffer: usize,
) -> Result<(Sid, MpscReceiver<Msg>)> {
self.subscribe_with_optional_queue_group(subject, Some(queue_group), buffer)
.await
}
pub async fn subscribe_with_optional_queue_group(
&self,
subject: &Subject,
queue_group: Option<&str>,
buffer: usize,
) -> Result<(Sid, MpscReceiver<Msg>)> {
let mut client = self.lock().await;
client
.subscribe_with_optional_queue_group(subject, queue_group, buffer)
.await
}
pub async fn unsubscribe(&self, sid: Sid) -> Result<()> {
self.unsubscribe_optional_max_msgs(sid, None).await
}
pub async fn unsubscribe_with_max_msgs(&self, sid: Sid, max_msgs: u64) -> Result<()> {
self.unsubscribe_optional_max_msgs(sid, Some(max_msgs))
.await
}
pub async fn unsubscribe_optional_max_msgs(
&self,
sid: Sid,
max_msgs: Option<u64>,
) -> Result<()> {
let mut client = self.lock().await;
client.unsubscribe_optional_max_msgs(sid, max_msgs).await
}
pub async fn unsubscribe_all(&self) -> Result<()> {
let unsubscribes = self
.sids()
.await
.into_iter()
.map(|sid| self.unsubscribe(sid));
future::try_join_all(unsubscribes).await?;
Ok(())
}
pub async fn info_stream(&self) -> WatchReceiver<Info> {
self.lock().await.info_stream()
}
pub async fn ping_stream(&self) -> WatchReceiver<()> {
self.lock().await.ping_stream()
}
pub async fn pong_stream(&self) -> WatchReceiver<()> {
self.lock().await.pong_stream()
}
pub async fn ok_stream(&self) -> WatchReceiver<()> {
self.lock().await.ok_stream()
}
pub async fn err_stream(&self) -> WatchReceiver<ProtocolError> {
self.lock().await.err_stream()
}
pub async fn ping(&self) -> Result<()> {
let mut client = self.lock().await;
client.ping().await
}
pub async fn pong(&self) -> Result<()> {
let mut client = self.lock().await;
client.pong().await
}
pub async fn ping_pong(&self) -> Result<()> {
SyncClient::ping_pong(Arc::clone(&self.sync)).await
}
async fn lock(&self) -> MutexGuard<'_, SyncClient> {
self.sync.lock().await
}
}
impl Drop for Client {
fn drop(&mut self) {
trace!("Client was dropped");
}
}
struct SyncClient {
addresses: Vec<Address>,
connect: Connect,
state: ConnectionState,
state_tx: WatchSender<ClientState>,
state_rx: WatchReceiver<ClientState>,
info_tx: WatchSender<Info>,
info_rx: WatchReceiver<Info>,
ping_tx: WatchSender<()>,
ping_rx: WatchReceiver<()>,
pong_tx: WatchSender<()>,
pong_rx: WatchReceiver<()>,
ok_tx: WatchSender<()>,
ok_rx: WatchReceiver<()>,
err_tx: WatchSender<ProtocolError>,
err_rx: WatchReceiver<ProtocolError>,
tcp_connect_timeout: Duration,
delay_generator: DelayGenerator,
subscriptions: HashMap<Sid, Subscription>,
}
impl SyncClient {
fn with_connect(addresses: Vec<Address>, connect: Connect) -> Self {
let state = ConnectionState::Disconnected;
let (state_tx, state_rx) = watch::channel((&state).into());
let (info_tx, info_rx) = watch::channel(Info::new());
let (ping_tx, ping_rx) = watch::channel(());
let (pong_tx, pong_rx) = watch::channel(());
let (ok_tx, ok_rx) = watch::channel(());
let (err_tx, err_rx) = watch::channel(ProtocolError::UnknownProtocolOperation);
Self {
addresses,
connect,
state,
state_tx,
state_rx,
info_tx,
info_rx,
ping_tx,
ping_rx,
pong_tx,
pong_rx,
ok_tx,
ok_rx,
err_tx,
err_rx,
tcp_connect_timeout: util::DEFAULT_TCP_CONNECT_TIMEOUT,
delay_generator: generate_delay_generator(
util::DEFAULT_CONNECT_SERIES_ATTEMPTS_BEFORE_COOL_DOWN,
util::DEFAULT_CONNECT_DELAY,
util::DEFAULT_CONNECT_SERIES_DELAY,
util::DEFAULT_COOL_DOWN,
),
subscriptions: HashMap::new(),
}
}
fn state(&self) -> ClientState {
self.state_rx.get_ref().clone()
}
fn state_stream(&self) -> WatchReceiver<ClientState> {
self.state_rx.clone()
}
pub fn info(&self) -> Info {
self.info_rx.get_ref().clone()
}
fn connect_mut(&mut self) -> &mut Connect {
&mut self.connect
}
fn addresses_mut(&mut self) -> &mut [Address] {
&mut self.addresses
}
fn tcp_connect_timeout(&self) -> Duration {
self.tcp_connect_timeout
}
fn set_tcp_connect_timeout(&mut self, tcp_connect_timeout: Duration) -> &mut Self {
self.tcp_connect_timeout = tcp_connect_timeout;
self
}
fn delay_generator_mut(&mut self) -> &mut DelayGenerator {
&mut self.delay_generator
}
fn subscriptions(&self) -> impl Iterator<Item = (&Sid, &Subscription)> {
self.subscriptions.iter()
}
async fn send_connect(&mut self) -> Result<()> {
if let ConnectionState::Connected(address, writer) = &mut self.state {
Self::send_connect_with_writer(writer, &self.connect, address).await
} else {
Err(Error::NotConnected)
}
}
#[allow(clippy::cognitive_complexity)]
async fn connect(wrapped_client: Client) {
if let ConnectionState::Connected(_, _) = wrapped_client.lock().await.state {
return;
}
let (addresses_len, mut addresses_iter) = {
let client = wrapped_client.lock().await;
let mut addresses = client
.addresses
.iter()
.chain(client.info_rx.get_ref().connect_urls().iter())
.cloned()
.collect::<Vec<_>>();
let addresses_len = addresses.len() as u64;
addresses.shuffle(&mut rand::thread_rng());
(addresses_len, addresses.into_iter().cycle())
};
let mut connect_attempts = 0;
loop {
if connect_attempts != 0 {
let delay = (wrapped_client.lock().await.delay_generator)(
&wrapped_client,
connect_attempts,
addresses_len,
);
debug!(
"Delaying for {}s after {} connect attempts with {} addresses",
delay.as_secs_f32(),
connect_attempts,
addresses_len
);
timer::delay(Instant::now() + delay).await;
}
connect_attempts += 1;
let mut client = wrapped_client.lock().await;
match client.state {
ConnectionState::Connected(_, _) => {
return;
}
ConnectionState::Disconnecting(_) => {
client.state_transition(StateTransition::ToDisconnected);
return;
}
_ => (),
}
let address = if let Some(address) = addresses_iter.next() {
address
} else {
error!("No addresses to connect to");
continue;
};
client.state_transition(StateTransition::ToConnecting(address.clone()));
let connect = Timeout::new(
TcpStream::connect(address.address()),
client.tcp_connect_timeout,
);
let (reader, mut writer) = match connect.await {
Ok(Ok(sink_and_stream)) => sink_and_stream.split(),
Ok(Err(e)) => {
error!("Failed to connect to '{}', err: {}", address, e);
continue;
}
Err(_) => {
error!("Timed out while connecting to '{}'", address);
continue;
}
};
let mut reader = FramedRead::new(reader, Codec::new());
let wait_for_info = Timeout::new(reader.next(), client.tcp_connect_timeout);
match wait_for_info.await {
Ok(Some(Ok(Ok(message)))) => {
if let ServerMessage::Info(info) = message {
client.handle_info_message(info);
} else {
error!(
"First message should be {} instead received '{:?}'",
util::INFO_OP_NAME,
message
);
debug_assert!(false);
continue;
}
}
Ok(Some(Ok(Err(e)))) => {
error!("Received invalid server message, err: {}", e);
continue;
}
Ok(Some(Err(e))) => {
error!("TCP socket error, err: {}", e);
continue;
}
Ok(None) => {
error!("TCP socket was disconnected");
continue;
}
Err(_) => {
error!("Timed out waiting for {} message", util::INFO_OP_NAME);
continue;
}
}
if let Err(e) =
Self::send_connect_with_writer(&mut writer, &client.connect, &address).await
{
error!("Failed to send connect message, err: {}", e);
continue;
}
let mut failed_to_resubscribe = Vec::new();
for (sid, subscription) in &client.subscriptions {
if let Err(e) =
Self::write_line(&mut writer, ClientControl::Sub(subscription)).await
{
error!(
"Failed to resubscribe to sid '{}' with subject '{}', err: {}",
sid,
subscription.subject(),
e
);
failed_to_resubscribe.push(*sid);
}
}
client
.subscriptions
.retain(|sid, _| !failed_to_resubscribe.contains(&sid));
tokio::spawn(Self::type_erased_server_messages_handler(
Client::clone(&wrapped_client),
reader,
));
client.state_transition(StateTransition::ToConnected(writer));
return;
}
}
async fn disconnect(wrapped_client: Arc<Mutex<Self>>) {
let (tx, rx) = oneshot::channel();
{
let mut client = wrapped_client.lock().await;
if let ConnectionState::Disconnected = client.state {
return;
}
let mut state_stream = client.state_stream();
tokio::spawn(async move {
while let Some(state) = state_stream.next().await {
if state.is_disconnected() {
tx.send(()).expect("to send disconnect signal");
break;
}
}
});
client.state_transition(StateTransition::ToDisconnecting);
}
rx.await.expect("to receive disconnect signal");
}
async fn publish_with_reply(
&mut self,
subject: &Subject,
reply_to: &Subject,
payload: &[u8],
) -> Result<()> {
self.publish_with_optional_reply(subject, Some(reply_to), payload)
.await
}
async fn publish_with_optional_reply(
&mut self,
subject: &Subject,
reply_to: Option<&Subject>,
payload: &[u8],
) -> Result<()> {
let max_payload = self.info().max_payload;
if payload.len() > max_payload {
return Err(Error::ExceedsMaxPayload {
tried: payload.len(),
limit: max_payload,
});
}
if let ConnectionState::Connected(_, writer) = &mut self.state {
Self::write_line(writer, ClientControl::Pub(subject, reply_to, payload.len())).await?;
writer.write_all(payload).await?;
writer
.write_all(util::MESSAGE_TERMINATOR.as_bytes())
.await?;
Ok(())
} else {
Err(Error::NotConnected)
}
}
async fn request(
wrapped_client: Arc<Mutex<Self>>,
subject: &Subject,
payload: &[u8],
) -> Result<Msg> {
let inbox_uuid = Uuid::new_v4();
let reply_to = format!("{}.{}", util::INBOX_PREFIX, inbox_uuid.to_simple()).parse()?;
let mut rx = {
let mut client = wrapped_client.lock().await;
let (sid, rx) = client.subscribe(&reply_to, 1).await?;
client.unsubscribe_with_max_msgs(sid, 1).await?;
client
.publish_with_reply(subject, &reply_to, payload)
.await?;
rx
};
Ok(rx.next().await.ok_or(Error::NoResponse)?)
}
async fn subscribe(
&mut self,
subject: &Subject,
buffer: usize,
) -> Result<(Sid, MpscReceiver<Msg>)> {
self.subscribe_with_optional_queue_group(subject, None, buffer)
.await
}
async fn subscribe_with_optional_queue_group(
&mut self,
subject: &Subject,
queue_group: Option<&str>,
buffer: usize,
) -> Result<(Sid, MpscReceiver<Msg>)> {
if let ConnectionState::Connected(_, writer) = &mut self.state {
let (tx, rx) = mpsc::channel(buffer);
let subscription =
Subscription::new(subject.clone(), queue_group.map(String::from), tx);
Self::write_line(writer, ClientControl::Sub(&subscription)).await?;
let sid = subscription.sid();
self.subscriptions.insert(sid, subscription);
Ok((sid, rx))
} else {
Err(Error::NotConnected)
}
}
async fn unsubscribe(&mut self, sid: Sid) -> Result<()> {
self.unsubscribe_optional_max_msgs(sid, None).await
}
async fn unsubscribe_with_max_msgs(&mut self, sid: Sid, max_msgs: u64) -> Result<()> {
self.unsubscribe_optional_max_msgs(sid, Some(max_msgs))
.await
}
async fn unsubscribe_optional_max_msgs(
&mut self,
sid: Sid,
max_msgs: Option<u64>,
) -> Result<()> {
if let ConnectionState::Connected(_, writer) = &mut self.state {
let subscription = match self.subscriptions.get_mut(&sid) {
Some(subscription) => subscription,
None => return Err(Error::UnknownSid(sid)),
};
subscription.unsubscribe_after = max_msgs;
Self::write_line(writer, ClientControl::Unsub(sid, max_msgs)).await?;
if subscription.unsubscribe_after.is_none() {
self.subscriptions.remove(&sid);
}
Ok(())
} else {
Err(Error::NotConnected)
}
}
pub fn info_stream(&mut self) -> WatchReceiver<Info> {
self.info_rx.clone()
}
pub fn ping_stream(&mut self) -> WatchReceiver<()> {
self.ping_rx.clone()
}
pub fn pong_stream(&mut self) -> WatchReceiver<()> {
self.pong_rx.clone()
}
pub fn ok_stream(&mut self) -> WatchReceiver<()> {
self.ok_rx.clone()
}
pub fn err_stream(&mut self) -> WatchReceiver<ProtocolError> {
self.err_rx.clone()
}
async fn ping(&mut self) -> Result<()> {
if let ConnectionState::Connected(_, writer) = &mut self.state {
Self::write_line(writer, ClientControl::Ping).await?;
Ok(())
} else {
Err(Error::NotConnected)
}
}
async fn pong(&mut self) -> Result<()> {
if let ConnectionState::Connected(_, writer) = &mut self.state {
Self::write_line(writer, ClientControl::Pong).await?;
Ok(())
} else {
Err(Error::NotConnected)
}
}
async fn ping_pong(wrapped_client: Arc<Mutex<Self>>) -> Result<()> {
let mut pong_stream = {
let mut client = wrapped_client.lock().await;
let mut pong_stream = client.pong_stream();
pong_stream.next().now_or_never();
client.ping().await?;
pong_stream
};
pong_stream.next().await;
Ok(())
}
async fn server_messages_handler(
wrapped_client: Client,
mut reader: FramedRead<TcpStreamReadHalf, Codec>,
) {
let disconnecting = Self::disconnecting(Arc::clone(&wrapped_client.sync));
pin_mut!(disconnecting);
loop {
let message_result = match future::select(reader.next(), disconnecting).await {
Either::Left((Some(message), unresolved_disconnecting)) => {
disconnecting = unresolved_disconnecting;
message
}
Either::Left((None, _)) => {
error!("TCP socket was disconnected");
break;
}
Either::Right(((), _)) => break,
};
match message_result {
Ok(Ok(message)) => {
Self::handle_server_message(Arc::clone(&wrapped_client.sync), message).await;
}
Ok(Err(e)) => {
error!("Received invalid server message, err: {}", e);
continue;
}
Err(e) => {
error!("TCP socket error, err: {}", e);
break;
}
};
}
let mut client = wrapped_client.lock().await;
let should_reconnect = !client.state().is_disconnecting();
if let StateTransitionResult::Writer(writer) =
client.state_transition(StateTransition::ToDisconnected)
{
match reader.into_inner().reunite(writer) {
Ok(tcp) => {
if let Err(e) = tcp.shutdown(Shutdown::Both) {
if e.kind() != ErrorKind::NotConnected {
error!("Failed to shutdown TCP stream, err: {}", e);
}
}
}
Err(e) => error!("Failed to reunite TCP stream, err: {}", e),
}
} else {
error!("Disconnected with no TCP writer. Unable to shutdown TCP stream.");
debug_assert!(false);
}
if should_reconnect {
tokio::spawn(Self::connect(Client::clone(&wrapped_client)));
}
}
#[allow(clippy::cognitive_complexity)]
async fn handle_server_message(wrapped_client: Arc<Mutex<Self>>, message: ServerMessage) {
match message {
ServerMessage::Info(info) => {
wrapped_client.lock().await.handle_info_message(info);
}
ServerMessage::Msg(msg) => {
let sid = msg.sid();
let mut client = wrapped_client.lock().await;
let subscription = match client.subscriptions.get_mut(&sid) {
Some(subscription) => subscription,
None => {
error!("Received unknown sid '{}'", sid);
debug_assert!(false);
let wrapped_client = Arc::clone(&wrapped_client);
tokio::spawn(async move {
info!("Unsubscribing from unknown sid '{}'", sid);
let mut client = wrapped_client.lock().await;
if let Err(e) = client.unsubscribe(sid).await {
error!("Failed to unsubscribe from '{}', err: {}", sid, e);
}
});
return;
}
};
let subject = msg.subject().clone();
if let Err(e) = subscription.tx.try_send(msg) {
if e.is_closed() {
let wrapped_client = Arc::clone(&wrapped_client);
tokio::spawn(async move {
info!("Unsubscribing from closed sid '{}'", sid);
let mut client = wrapped_client.lock().await;
if let Err(e) = client.unsubscribe(sid).await {
error!("Failed to unsubscribe from sid '{}', err: {}", sid, e);
}
});
} else {
error!(
"Failed to send msg to sid '{}' with subject '{}', err: {}",
sid, subject, e
);
}
}
if let Some(unsubscribe_after) = &mut subscription.unsubscribe_after() {
*unsubscribe_after -= 1;
if *unsubscribe_after == 0 {
client.subscriptions.remove(&sid);
}
}
}
ServerMessage::Ping => {
if let Err(e) = wrapped_client.lock().await.ping_tx.broadcast(()) {
error!("Failed to broadcast {}, err: {}", util::PING_OP_NAME, e);
}
let wrapped_client = Arc::clone(&wrapped_client);
tokio::spawn(async move {
let mut client = wrapped_client.lock().await;
if let Err(e) = client.pong().await {
error!("Failed to send {}, err: {}", util::PONG_OP_NAME, e);
}
});
}
ServerMessage::Pong => {
if let Err(e) = wrapped_client.lock().await.pong_tx.broadcast(()) {
error!("Failed to broadcast {}, err: {}", util::PONG_OP_NAME, e);
}
}
ServerMessage::Ok => {
if let Err(e) = wrapped_client.lock().await.ok_tx.broadcast(()) {
error!("Failed to broadcast {}, err: {}", util::OK_OP_NAME, e);
}
}
ServerMessage::Err(e) => {
error!("Protocol error, err: '{}'", e);
if let Err(e) = wrapped_client.lock().await.err_tx.broadcast(e) {
error!("Failed to broadcast {}, err: {}", util::ERR_OP_NAME, e);
}
}
}
}
fn handle_info_message(&mut self, info: Info) {
if let Err(e) = self.info_tx.broadcast(info) {
error!("Failed to broadcast {}, err: {}", util::INFO_OP_NAME, e);
}
}
fn type_erased_server_messages_handler(
wrapped_client: Client,
reader: FramedRead<TcpStreamReadHalf, Codec>,
) -> impl std::future::Future<Output = ()> + Send {
Self::server_messages_handler(wrapped_client, reader)
}
async fn disconnecting(wrapped_client: Arc<Mutex<Self>>) {
let mut state_stream = wrapped_client.lock().await.state_stream();
while let Some(state) = state_stream.next().await {
if state.is_disconnecting() {
break;
}
}
}
async fn write_line(
writer: &mut TcpStreamWriteHalf,
control_line: ClientControl<'_>,
) -> Result<()> {
let line = control_line.to_line();
Ok(writer.write_all(line.as_bytes()).await?)
}
async fn send_connect_with_writer(
writer: &mut TcpStreamWriteHalf,
connect: &Connect,
address: &Address,
) -> Result<()> {
let mut connect = connect.clone();
if let Some(authorization) = address.authorization() {
connect.set_authorization(Some(authorization.clone()));
}
Self::write_line(writer, ClientControl::Connect(&connect)).await?;
Ok(())
}
fn state_transition(&mut self, transition: StateTransition) -> StateTransitionResult {
let previous_client_state = ClientState::from(&self.state);
let previous_state = mem::replace(&mut self.state, ConnectionState::Disconnected);
let (next_state, result) = match (previous_state, transition) {
(ConnectionState::Disconnected, StateTransition::ToConnecting(address)) => (
ConnectionState::Connecting(address),
StateTransitionResult::None,
),
(ConnectionState::Connecting(_), StateTransition::ToConnecting(address)) => (
ConnectionState::Connecting(address),
StateTransitionResult::None,
),
(ConnectionState::Connecting(address), StateTransition::ToConnected(writer)) => (
ConnectionState::Connected(address, writer),
StateTransitionResult::None,
),
(ConnectionState::Connecting(_), StateTransition::ToDisconnecting) => (
ConnectionState::Disconnecting(None),
StateTransitionResult::None,
),
(ConnectionState::Connected(_, writer), StateTransition::ToDisconnecting) => (
ConnectionState::Disconnecting(Some(writer)),
StateTransitionResult::None,
),
(ConnectionState::Connected(_, writer), StateTransition::ToDisconnected) => (
ConnectionState::Disconnected,
StateTransitionResult::Writer(writer),
),
(ConnectionState::Disconnecting(Some(writer)), StateTransition::ToDisconnected) => (
ConnectionState::Disconnected,
StateTransitionResult::Writer(writer),
),
(ConnectionState::Disconnecting(None), StateTransition::ToDisconnected) => {
(ConnectionState::Disconnected, StateTransitionResult::None)
}
(_, transition) => {
error!(
"Invalid transition '{:?}' from '{}'",
transition, previous_client_state,
);
unreachable!();
}
};
self.state = next_state;
let next_client_state = ClientState::from(&self.state);
info!(
"Transitioned to state '{}' from '{}'",
next_client_state, previous_client_state
);
self.state_tx
.broadcast(next_client_state)
.expect("to broadcast state transition");
result
}
}