Axum Websocket Broadcaster
A broadcasting liblary for both axum-typed-websockets and axum::extract::ws, similar to actix-ws-broadcaster.
This liblary is basically the equivalent of actix-ws-broadcaster, but for axum ecosystem. Most of the api's work the same way and their usage is almost identical with some of the exceptions.
This liblary provides grouping and broadcasting mechanism for both websocket implementations of axum ecosystem. You have individual Connection for each "Receiver"s, will be identified as the given id. And there is also rooms exist, which benefits to group related connections on a single entity.
Guide
Adding dependency
Add that to your Cargo.toml file:
axum-ws-broadcaster = "0.11.0"
axum-ws-broadcaster = { version = "0.11.0", features = ["typed"] }
Import
use axum_wsb::normal::Broadcaster;
use axum_wsb::typed::Broadcaster;
Initialize
Initialize it in a place which it can hold it's state:
let receivers: Arc<RwLock<Broadcaster>> = Broadcaster::new();
let receivers: Arc<RwLock<Broadcaster<T, S>>> = Broadcaster::new();
Handle Connections And Rooms
We implemented a configure() function, which takes WebSocket as argument and returns the receiver and stream:
let (receiver, mut stream) = Broadcaster::configure(socket);
Later you have to handle the connections and rooms in the websocket route:
let broadcaster = Broadcaster::handle(&broadcaster, &room_id, &conn_id, receiver).await;
They work both same on two api's.
Broadcast The Messages
Note: You have to do broadcasting in same broadcaster instance, don't clone it. Otherwise it could cause data race.
The typed and normal api's works slightly differently, follow the guide:
In the loop of websocket, if a message received, you can broadcast it by that code:
Normal
If you are familiar, normal api works almost identical to the websockets of actix-ws-broadcaster:
Message::Text(input) => {
let mut broadcaster = broadcaster.write().await;
let _ = broadcaster.room(&query.room).broadcast(&input).await;
}
Typed
But typed websockets is more different due to they are "typed":
Message::Item(input) => {
let input = input;
let mut broadcaster = broadcaster.write().await;
let output = output;
let _ = broadcaster.room(&query.room).broadcast(&output).await;
},
A Comprehensive example
Normal Api
async fn websocket_handler(ws: WebSocketUpgrade, Query(query): Query<WebsocketQueries>, State(state): State<Arc<RwLock<Broadcaster>>>) -> impl IntoResponse {
ws.on_upgrade(|socket| handle_socket(socket, Query(query), state))
}
async fn handle_socket(socket: WebSocket, Query(query): Query<WebsocketQueries>, state: Arc<RwLock<Broadcaster>>) {
let (receiver, mut stream) = Broadcaster::configure(socket);
let broadcaster = Broadcaster::handle(&state, &query.room, &query.id, receiver).await;
while let Some(msg_result) = stream.next().await {
match msg_result {
Ok(message) => {
match message {
Message::Text(input) => {
let mut broadcaster = broadcaster.write().await;
let _ = broadcaster.room(&query.room).broadcast(&input).await;
},
Message::Close(_) => {
let mut broadcaster = broadcaster.write().await;
let _ = broadcaster.remove_connection(&query.id).unwrap().close().await;
let mut broadcaster = broadcaster.write().await;
let _ = broadcaster.room(&query.room).close(None).await;
let mut broadcaster = broadcaster.write().await;
let _ = broadcaster.room(&query.room).close_conn(None, &query.id).await;
let mut broadcaster = broadcaster.write().await;
let _ = broadcaster.remove_room(&query.room).await;
return;
},
Message::Ping(ping) => {
let mut broadcaster = broadcaster.write().await;
let _ = broadcaster.room(&query.room).pong(&ping).await;
},
Message::Pong(pong) => {
let mut broadcaster = broadcaster.write().await;
let _ = broadcaster.room(&query.room).ping(&pong).await;
},
Message::Binary(binary) => {
let mut broadcaster = broadcaster.write().await;
let _ = broadcaster.room(&query.room).binary(&binary).await;
}
}
},
Err(error) => println!("that error occured: {}", error)
}
}
}
Typed Api
Assuming you passed broadcaster as a State(), your sending type is String, receiving type is WeboscketInput and they have same fields:
async fn websocket_handler(ws: WebSocketUpgrade<String, WebsocketInput>, Query(query): Query<WebsocketQueries>, State(state): State<Arc<RwLock<Broadcaster<String, WebsocketInput>>>>) -> impl IntoResponse {
ws.on_upgrade(|socket| handle_socket(socket, Query(query), state))
}
async fn handle_socket(socket: WebSocket<String, WebsocketInput>, Query(query): Query<WebsocketQueries>, state: Arc<RwLock<Broadcaster<String, WebsocketInput>>>) {
let (receiver, mut stream) = Broadcaster::configure(socket);
let broadcaster = Broadcaster::handle(&state, &query.room, &query.id, receiver).await;
while let Some(msg_result) = stream.next().await {
match msg_result {
Ok(message) => {
match message {
Message::Item(input) => {
let output = WebsocketOutput {
name: input.name,
id: input.id,
message: input.message
};
let output = serde_json::to_string(&output).unwrap();
let mut broadcaster = broadcaster.write().await;
let _ = broadcaster.room(&query.room).broadcast(&output).await;
},
Message::Close(_) => {
let mut broadcaster = broadcaster.write().await;
let _ = broadcaster.remove_connection(&query.id).unwrap().close().await;
let mut broadcaster = broadcaster.write().await;
let _ = broadcaster.room(&query.room).close(None).await;
let mut broadcaster = broadcaster.write().await;
let _ = broadcaster.room(&query.room).close_conn(None, &query.id).await;
let mut broadcaster = broadcaster.write().await;
let _ = broadcaster.remove_room(&query.room).await;
return;
},
Message::Ping(ping) => {
let mut broadcaster = broadcaster.write().await;
let _ = broadcaster.room(&query.room).pong(&ping).await;
},
Message::Pong(pong) => {
let mut broadcaster = broadcaster.write().await;
let _ = broadcaster.room(&query.room).ping(&pong).await;
}
}
},
Err(error) => println!("that error occured: {}", error)
}
}
}
Try It Yourself
To try it yourself, run that commands:
cargo run --example normal-example
Or:
cargo run --example typed-example --features typed
Than go to the http://localhost:5000 address on a firefox based browser(such as firefox, librewolf etc.). Because chromium based browsers don't support to send query parameters to websockets from the javascript, our front-end configuration don't work on them. In real world scenarios, you have to provide room and connection id's with different approach.
Contribution Guide
Issues, suggestions and pull requests are welcome.