pub trait WebSocketHandler: Send + 'static {
fn handle_message(
&mut self,
message: WebSocketMessage
) -> Vec<WebSocketMessage>;
fn websocket_config(&self) -> WebSocketConfig { ... }
fn handle_start(&mut self) -> Vec<WebSocketMessage> { ... }
fn handle_close(&mut self, reconnect: bool) { ... }
}
Expand description
A trait
which is used to handle events on the WebSocketConnection.
The struct
implementing this trait
is required to be Send and 'static
because
it will be sent between threads.
Required Methods§
sourcefn handle_message(&mut self, message: WebSocketMessage) -> Vec<WebSocketMessage>
fn handle_message(&mut self, message: WebSocketMessage) -> Vec<WebSocketMessage>
Called when the WebSocketConnection received a message, returns messages to be sent to the server.
Provided Methods§
sourcefn websocket_config(&self) -> WebSocketConfig
fn websocket_config(&self) -> WebSocketConfig
Returns a prefix that will be appended to the URL of all the websocket connections being started using this handler.
Examples found in repository?
src/websocket.rs (line 60)
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
pub async fn new(url: &str, handler: H) -> Result<Self, tungstenite::Error> {
let config = handler.websocket_config();
let handler = Arc::new(SyncMutex::new(handler));
let url = config.url_prefix + url;
let (message_tx, message_rx) = tokio_mpsc::unbounded_channel();
let reconnect_manager = ReconnectState::new();
let connection = Arc::new(ConnectionInner {
url,
handler: Arc::clone(&handler),
message_tx,
connection_id: AtomicBool::new(false),
});
async fn feed_handler(
connection: Arc<ConnectionInner<impl WebSocketHandler>>,
mut message_rx: tokio_mpsc::UnboundedReceiver<FeederMessage>,
reconnect_manager: ReconnectState,
no_duplicate: bool,
sink: Arc<AsyncMutex<WebSocketSplitSink>>,
) {
let mut messages: HashMap<WebSocketMessage, isize> = HashMap::new();
while let Some(Some((id, message))) = message_rx.recv().await {
match message {
Ok(message) => {
if let Some(message) = WebSocketMessage::from_message(message) {
if reconnect_manager.is_reconnecting() {
// reconnecting
let id_sign: isize = if id {
1
} else {
-1
};
let entry = messages.entry(message.clone());
match entry {
Entry::Occupied(mut occupied) => {
if no_duplicate {
log::debug!("Skipping duplicate message.");
continue;
}
*occupied.get_mut() += id_sign;
if id_sign != occupied.get().signum() {
// same message which comes from different connections, so we assume it's a duplicate.
log::debug!("Skipping duplicate message.");
continue;
}
// comes from the same connection, which means the message was sent twice.
},
Entry::Vacant(vacant) => {
// new message
vacant.insert(id_sign);
}
}
} else {
messages.clear();
}
let messages = connection.handler.lock().handle_message(message);
for message in messages {
if let Err(error) = sink.lock().await.send(message.into_message()).await {
log::error!("Failed to send message due to an error: {}", error);
};
}
}
},
Err(error) => {
if reconnect_manager.request_reconnect() {
log::error!("Failed to receive message due to an error: {}, reconnecting", error);
}
},
}
}
connection.handler.lock().handle_close(false);
}
async fn reconnect<H: WebSocketHandler>(
interval: Duration,
cooldown: Duration,
connection: Arc<ConnectionInner<H>>,
sink: Arc<AsyncMutex<WebSocketSplitSink>>,
reconnect_manager: ReconnectState,
no_duplicate: bool,
window: Duration,
) {
let mut cooldown = tokio::time::interval(cooldown);
cooldown.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
let timer = if interval.is_zero() {
// never completes
tokio::time::sleep(Duration::MAX)
} else {
tokio::time::sleep(interval)
};
tokio::select! {
_ = reconnect_manager.inner.reconnect_notify.notified() => {},
_ = timer => {},
}
cooldown.tick().await;
reconnect_manager.inner.reconnecting.store(true, Ordering::SeqCst);
// reconnect_notify might have been notified while waiting the cooldown,
// so we consume any existing permits on reconnect_notify
reconnect_manager.inner.reconnect_notify.notify_one();
// this completes immediately because we just added a permit
reconnect_manager.inner.reconnect_notify.notified().await;
// start a new connection
match WebSocketConnection::<H>::start_connection(Arc::clone(&connection)).await {
Ok(new_sink) => {
// replace the sink with the new one
let mut old_sink = mem::replace(&mut *sink.lock().await, new_sink);
if let Err(error) = old_sink.close().await {
log::warn!("An error occurred while closing old connection during auto-refresh: {}", error);
}
connection.handler.lock().handle_close(true);
},
Err(error) => {
// try reconnecting again
log::error!("Failed to reconnect due to an error: {}, reconnecting", error);
reconnect_manager.inner.reconnect_notify.notify_one();
},
}
if no_duplicate {
// make sure we don't lose any message
tokio::time::sleep(window).await;
}
reconnect_manager.inner.reconnecting.store(false, Ordering::SeqCst);
}
}
let sink = Self::start_connection(Arc::clone(&connection)).await?;
let sink = Arc::new(AsyncMutex::new(sink));
tokio::spawn(
feed_handler(
Arc::clone(&connection),
message_rx,
reconnect_manager.clone(),
config.ignore_duplicate_during_reconnection,
Arc::clone(&sink),
)
);
let task_reconnect = tokio::spawn(reconnect(
config.refresh_after,
config.connect_cooldown,
Arc::clone(&connection),
Arc::clone(&sink),
reconnect_manager.clone(),
config.ignore_duplicate_during_reconnection,
config.reconnection_window,
));
Ok(Self {
task_reconnect,
sink,
inner: connection,
reconnect_state: reconnect_manager,
})
}
sourcefn handle_start(&mut self) -> Vec<WebSocketMessage>
fn handle_start(&mut self) -> Vec<WebSocketMessage>
Called when a new connection has been started, and returns messages that should be sent to the server.
This could be called multiple times because the connection can be reconnected.
Examples found in repository?
src/websocket.rs (line 227)
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
async fn start_connection(connection: Arc<ConnectionInner<impl WebSocketHandler>>) -> Result<WebSocketSplitSink, tungstenite::Error> {
let (websocket_stream, _) = tokio_tungstenite::connect_async(connection.url.clone()).await?;
let (mut sink, mut stream) = websocket_stream.split();
let messages = connection.handler.lock().handle_start();
for message in messages {
sink.send(message.into_message()).await?;
}
// fetch_not is unstable, so we xor it with true which gives the same result
let id = connection.connection_id.fetch_xor(true, Ordering::SeqCst);
// pass messages to task_feed_handler
tokio::spawn(async move {
while let Some(message) = stream.next().await {
if connection.message_tx.send(Some((id, message))).is_err() {
// task_feed_handler is dropped, which means there is no one to consume messages
break;
}
}
});
Ok(sink)
}
sourcefn handle_close(&mut self, reconnect: bool)
fn handle_close(&mut self, reconnect: bool)
Called when a websocket connection is closed.
If the parameter reconnect
is:
true
, it means that the connection is being reconnected for some reason.false
, it means that the connection will not be reconnected, because the WebSocketConnection was dropped.
Examples found in repository?
src/websocket.rs (line 132)
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
async fn feed_handler(
connection: Arc<ConnectionInner<impl WebSocketHandler>>,
mut message_rx: tokio_mpsc::UnboundedReceiver<FeederMessage>,
reconnect_manager: ReconnectState,
no_duplicate: bool,
sink: Arc<AsyncMutex<WebSocketSplitSink>>,
) {
let mut messages: HashMap<WebSocketMessage, isize> = HashMap::new();
while let Some(Some((id, message))) = message_rx.recv().await {
match message {
Ok(message) => {
if let Some(message) = WebSocketMessage::from_message(message) {
if reconnect_manager.is_reconnecting() {
// reconnecting
let id_sign: isize = if id {
1
} else {
-1
};
let entry = messages.entry(message.clone());
match entry {
Entry::Occupied(mut occupied) => {
if no_duplicate {
log::debug!("Skipping duplicate message.");
continue;
}
*occupied.get_mut() += id_sign;
if id_sign != occupied.get().signum() {
// same message which comes from different connections, so we assume it's a duplicate.
log::debug!("Skipping duplicate message.");
continue;
}
// comes from the same connection, which means the message was sent twice.
},
Entry::Vacant(vacant) => {
// new message
vacant.insert(id_sign);
}
}
} else {
messages.clear();
}
let messages = connection.handler.lock().handle_message(message);
for message in messages {
if let Err(error) = sink.lock().await.send(message.into_message()).await {
log::error!("Failed to send message due to an error: {}", error);
};
}
}
},
Err(error) => {
if reconnect_manager.request_reconnect() {
log::error!("Failed to receive message due to an error: {}, reconnecting", error);
}
},
}
}
connection.handler.lock().handle_close(false);
}
async fn reconnect<H: WebSocketHandler>(
interval: Duration,
cooldown: Duration,
connection: Arc<ConnectionInner<H>>,
sink: Arc<AsyncMutex<WebSocketSplitSink>>,
reconnect_manager: ReconnectState,
no_duplicate: bool,
window: Duration,
) {
let mut cooldown = tokio::time::interval(cooldown);
cooldown.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
let timer = if interval.is_zero() {
// never completes
tokio::time::sleep(Duration::MAX)
} else {
tokio::time::sleep(interval)
};
tokio::select! {
_ = reconnect_manager.inner.reconnect_notify.notified() => {},
_ = timer => {},
}
cooldown.tick().await;
reconnect_manager.inner.reconnecting.store(true, Ordering::SeqCst);
// reconnect_notify might have been notified while waiting the cooldown,
// so we consume any existing permits on reconnect_notify
reconnect_manager.inner.reconnect_notify.notify_one();
// this completes immediately because we just added a permit
reconnect_manager.inner.reconnect_notify.notified().await;
// start a new connection
match WebSocketConnection::<H>::start_connection(Arc::clone(&connection)).await {
Ok(new_sink) => {
// replace the sink with the new one
let mut old_sink = mem::replace(&mut *sink.lock().await, new_sink);
if let Err(error) = old_sink.close().await {
log::warn!("An error occurred while closing old connection during auto-refresh: {}", error);
}
connection.handler.lock().handle_close(true);
},
Err(error) => {
// try reconnecting again
log::error!("Failed to reconnect due to an error: {}, reconnecting", error);
reconnect_manager.inner.reconnect_notify.notify_one();
},
}
if no_duplicate {
// make sure we don't lose any message
tokio::time::sleep(window).await;
}
reconnect_manager.inner.reconnecting.store(false, Ordering::SeqCst);
}
}