use std::fmt::{self, Debug};
use std::sync::Arc;
use std::time::Duration;
use crate::channel::{connect_websocket, TrySendError, WebSocketReceiver};
use crate::error::{Error, Result};
use crate::model::outgoing::OutgoingMessage;
#[cfg(feature = "async-tungstenite09")]
use async_tungstenite09 as async_tungstenite;
#[cfg(feature = "async-std-runtime")]
use async_std::task;
#[cfg(feature = "async-std-runtime")]
use async_std::task::sleep;
use async_tungstenite::tungstenite::Error as WsError;
use futures::stream::StreamExt;
use log::{info, warn};
#[cfg(feature = "tokio-runtime")]
use tokio::task;
#[cfg(feature = "tokio-runtime")]
use tokio::time::sleep;
#[cfg(feature = "tokio02-runtime")]
use tokio02::task;
#[cfg(feature = "tokio02-runtime")]
use tokio02::time::delay_for as sleep;
use url::Url;
pub mod channel;
pub mod handler;
pub mod model;
use channel::{control_channel, ControlReceiver, ControlSender};
use handler::Handler;
use model::SharedBrokerState;
#[derive(Debug)]
pub(crate) struct Broker {
broker_rx: ControlReceiver,
handler: Handler,
reconnect: ReconnectConfig,
url: Url,
}
#[derive(Clone)]
pub struct ReconnectCondition {
inner: ReconnectConditionKind,
}
impl Debug for ReconnectCondition {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("ReconnectCondition")
.field(&self.inner)
.finish()
}
}
#[derive(Clone)]
enum ReconnectConditionKind {
Always,
Never,
UnexpectedReset,
Custom(Arc<dyn Fn(&Error) -> bool + Send + Sync + 'static>),
}
impl Debug for ReconnectConditionKind {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
ReconnectConditionKind::Always => f.debug_tuple("Always").finish(),
ReconnectConditionKind::Never => f.debug_tuple("Never").finish(),
ReconnectConditionKind::UnexpectedReset => f.debug_tuple("UnexpectedReset").finish(),
ReconnectConditionKind::Custom(_) => f.debug_tuple("Custom").finish(),
}
}
}
impl ReconnectCondition {
pub fn always() -> Self {
ReconnectCondition {
inner: ReconnectConditionKind::Always,
}
}
pub fn never() -> Self {
ReconnectCondition {
inner: ReconnectConditionKind::Never,
}
}
pub fn unexpected_reset() -> Self {
ReconnectCondition {
inner: ReconnectConditionKind::UnexpectedReset,
}
}
pub fn custom<F>(f: F) -> Self
where
F: Fn(&Error) -> bool + Send + Sync + 'static,
{
ReconnectCondition {
inner: ReconnectConditionKind::Custom(Arc::new(f)),
}
}
fn should_reconnect(&self, err: &Error) -> bool {
match &self.inner {
ReconnectConditionKind::Always => true,
ReconnectConditionKind::Never => false,
ReconnectConditionKind::UnexpectedReset => {
let ws = match err {
Error::WebSocket(ws) => ws,
_ => return false,
};
use std::io::ErrorKind;
match ws.as_ref() {
WsError::Protocol(_) => true,
WsError::Io(e) => {
e.kind() == ErrorKind::ConnectionReset || e.kind() == ErrorKind::BrokenPipe
}
_ => false,
}
}
ReconnectConditionKind::Custom(f) => f(err),
}
}
}
impl Default for ReconnectCondition {
fn default() -> ReconnectCondition {
ReconnectCondition::unexpected_reset()
}
}
#[derive(Debug, Clone)]
pub struct ReconnectConfig {
pub interval: Duration,
pub condition: ReconnectCondition,
pub retry_send: bool,
}
impl ReconnectConfig {
pub fn none() -> ReconnectConfig {
ReconnectConfig::with_condition(ReconnectCondition::never())
}
pub fn with_condition(condition: ReconnectCondition) -> ReconnectConfig {
ReconnectConfig {
condition,
..Default::default()
}
}
pub fn with_interval(interval: Duration) -> ReconnectConfig {
ReconnectConfig {
interval,
..Default::default()
}
}
}
impl Default for ReconnectConfig {
fn default() -> ReconnectConfig {
ReconnectConfig {
interval: Duration::from_secs(5),
condition: ReconnectCondition::default(),
retry_send: true,
}
}
}
impl Broker {
pub async fn spawn(
url: Url,
reconnect: ReconnectConfig,
) -> Result<(ControlSender, SharedBrokerState)> {
let state = SharedBrokerState::working();
let shared_state = SharedBrokerState::clone(&state);
let (broker_tx, broker_rx) = control_channel(SharedBrokerState::clone(&state));
task::spawn(async move {
let mut broker = Broker {
url,
broker_rx,
reconnect,
handler: Handler::new(),
};
if let Some(err) = broker.run().await {
state.set_error(err).await;
} else {
state.set_exited().await;
}
std::mem::drop(broker);
});
Ok((broker_tx, shared_state))
}
async fn run(&mut self) -> Option<Error> {
let mut remaining_message = None;
loop {
let err = match self.task(remaining_message.take()).await {
Ok(()) => {
info!("broker: exited normally");
return None;
}
Err(e) => e,
};
info!("broker: task exited with error: {:?}", err.error);
if !self.reconnect.condition.should_reconnect(&err.error) {
warn!("broker: died with error");
return Some(err.error);
}
if self.reconnect.retry_send {
remaining_message = err.remaining_message;
}
info!(
"broker: attempt to reconnect in {:?}",
self.reconnect.interval
);
sleep(self.reconnect.interval).await;
}
}
async fn clean_handler(&mut self, websocket_rx: &mut WebSocketReceiver) -> Result<()> {
if self.handler.is_empty() {
return Ok(());
}
info!("broker: handler is not empty, enter receiving loop");
while !self.handler.is_empty() {
let msg = websocket_rx.recv().await?;
self.handler.handle(msg).await?;
}
Ok(())
}
async fn task(
&mut self,
remaining_message: Option<OutgoingMessage>,
) -> std::result::Result<(), TaskError> {
use futures::future::{self, Either};
let (mut websocket_tx, mut websocket_rx) = match connect_websocket(self.url.clone()).await {
Ok(x) => x,
Err(error) => {
return Err(TaskError {
remaining_message,
error,
});
}
};
info!("broker: started");
if let Some(message) = remaining_message {
websocket_tx.try_send(message).await?;
}
for message in self.handler.restore_messages() {
websocket_tx.try_send(message).await?;
}
loop {
let t1 = websocket_rx.recv();
let t2 = self.broker_rx.next();
futures::pin_mut!(t1, t2);
match future::select(t1, t2).await {
Either::Left((msg, _)) => {
while let Some(ctrl) = self.broker_rx.try_recv() {
#[cfg(feature = "inspect-contents")]
log::debug!("broker: received control {:?}", ctrl);
if let Some(out) = self.handler.control(ctrl) {
websocket_tx.try_send(out).await?
}
}
self.handler.handle(msg?).await?;
}
Either::Right((Some(ctrl), _)) => {
#[cfg(feature = "inspect-contents")]
log::debug!("broker: received control {:?}", ctrl);
if let Some(out) = self.handler.control(ctrl) {
websocket_tx.try_send(out).await?
}
}
Either::Right((None, _)) => {
info!("broker: all controls terminated, exiting gracefully");
return Ok(self.clean_handler(&mut websocket_rx).await?);
}
}
}
}
}
#[derive(Debug, Clone)]
struct TaskError {
remaining_message: Option<OutgoingMessage>,
error: Error,
}
impl From<Error> for TaskError {
fn from(error: Error) -> TaskError {
TaskError {
remaining_message: None,
error,
}
}
}
impl From<TrySendError> for TaskError {
fn from(err: TrySendError) -> TaskError {
let TrySendError { message, error } = err;
TaskError {
remaining_message: Some(message),
error,
}
}
}