pub struct BroadcastOperators<A: Adapter = LocalAdapter> { /* private fields */ }
Expand description
Chainable operators to select sockets to send a message to and to configure the message to be sent.
Implementations§
Source§impl<A: Adapter> BroadcastOperators<A>
impl<A: Adapter> BroadcastOperators<A>
Sourcepub fn to(self, rooms: impl RoomParam) -> Self
pub fn to(self, rooms: impl RoomParam) -> Self
§Select all the sockets in the given rooms except for the current socket.
When called from a socket, if you want to also include it, use the within()
operator.
However, when called from the io
(global context) level, there will be no difference.
§Example
async fn handler(socket: SocketRef, io: SocketIo, Data(data): Data::<Value>) {
// Emit a message to all sockets in room1, room2, room3, and room4, except the current socket
socket
.to("room1")
.to(["room2", "room3"])
.emit("test", &data)
.await;
// Emit a message to all sockets in room1, room2, room3, and room4, including the current socket
io
.to("room1")
.to(["room2", "room3"])
.emit("test", &data)
.await;
}
let (_, io) = SocketIo::new_svc();
io.ns("/", |s: SocketRef| s.on("test", handler));
Sourcepub fn within(self, rooms: impl RoomParam) -> Self
pub fn within(self, rooms: impl RoomParam) -> Self
§Select all the sockets in the given rooms (current one included).
This includes the current socket, in contrast to the to()
operator.
However, when called from the io
(global context) level, there will be no difference.
§Example
async fn handler(socket: SocketRef, Data(data): Data::<Value>) {
let other_rooms = "room4".to_string();
// Emit a message to all sockets in room1, room2, room3, and room4, including the current socket
socket
.within("room1")
.within(["room2", "room3"])
.within(vec![other_rooms])
.emit("test", &data)
.await;
}
let (_, io) = SocketIo::new_svc();
io.ns("/", |s: SocketRef| s.on("test", handler));
Sourcepub fn except(self, rooms: impl RoomParam) -> Self
pub fn except(self, rooms: impl RoomParam) -> Self
§Filter out all sockets selected with the previous operators that are in the specified rooms.
§Example
async fn handler(socket: SocketRef, Data(data): Data::<Value>) {
// This message will be broadcast to all sockets in the namespace,
// except for those in room1 and the current socket
socket.broadcast().except("room1").emit("test", &data).await;
}
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| {
socket.on("register1", |s: SocketRef| s.join("room1"));
socket.on("register2", |s: SocketRef| s.join("room2"));
socket.on("test", handler);
});
Sourcepub fn local(self) -> Self
pub fn local(self) -> Self
§Broadcast to all sockets only connected to this node.
When using the default in-memory adapter, this operator is a no-op.
§Example
async fn handler(socket: SocketRef, Data(data): Data::<Value>) {
// This message will be broadcast to all sockets in this
// namespace that are connected to this node
socket.local().emit("test", &data).await;
}
let (_, io) = SocketIo::new_svc();
io.ns("/", |s: SocketRef| s.on("test", handler));
Sourcepub fn broadcast(self) -> Self
pub fn broadcast(self) -> Self
§Broadcast to all sockets without any filtering (except the current socket).
If you want to include the current socket use the broadcast operators from the io
global context.
§Example
async fn handler(io: SocketIo, socket: SocketRef, Data(data): Data::<Value>) {
// This message will be broadcast to all sockets in this namespace except this one.
socket.broadcast().emit("test", &data).await;
// This message will be broadcast to all sockets in this namespace, including this one.
io.emit("test", &data).await;
}
let (_, io) = SocketIo::new_svc();
io.ns("/", |s: SocketRef| s.on("test", handler));
Sourcepub fn timeout(self, timeout: Duration) -> Self
pub fn timeout(self, timeout: Duration) -> Self
§Set a custom timeout when sending a message with an acknowledgement.
- See
SocketIoBuilder::ack_timeout
for the default timeout. - See
emit_with_ack()
for more details on acknowledgements.
§Example
async fn handler(socket: SocketRef, Data(data): Data::<Value>) {
// Emit a test message in the room1 and room3 rooms, except for the room2
// room with the binary payload received, and wait for 5 seconds for an acknowledgement
socket.to("room1")
.to("room3")
.except("room2")
.timeout(Duration::from_secs(5))
.emit_with_ack::<_, Value>("message-back", &data)
.await
.unwrap()
.for_each(async |(id, ack)| {
match ack {
Ok(ack) => println!("Ack received from socket {}: {:?}", id, ack),
Err(err) => println!("Ack error from socket {}: {:?}", id, err),
}
}).await;
}
let (_, io) = SocketIo::new_svc();
io.ns("/", |s: SocketRef| s.on("test", handler));
Source§impl<A: Adapter> BroadcastOperators<A>
impl<A: Adapter> BroadcastOperators<A>
Sourcepub fn emit<T: ?Sized + Serialize>(
self,
event: impl AsRef<str>,
data: &T,
) -> impl Future<Output = Result<(), BroadcastError>> + Send
pub fn emit<T: ?Sized + Serialize>( self, event: impl AsRef<str>, data: &T, ) -> impl Future<Output = Result<(), BroadcastError>> + Send
§Emit a message to one or many clients
If you provide tuple-like data (tuples, arrays), it will be considered as multiple arguments.
Therefore, if you want to send an array as the first argument of the payload,
you need to wrap it in an array or a tuple. A Vec
will always be considered a single argument.
§Emitting binary data
To emit binary data, you must use a data type that implements Serialize
as binary data.
Currently, if you use Vec<u8>
, it will be considered a sequence of numbers rather than binary data.
To handle this, you can either use a special type like Bytes
or the serde_bytes
crate.
If you want to emit generic data that may contain binary, use rmpv::Value
instead of
serde_json::Value
, as binary data will otherwise be serialized as a sequence of numbers.
§Errors
- When encoding the data, a
SendError::Serialize
may be returned. - If the underlying engine.io connection is closed, a
SendError::Socket(SocketError::Closed)
will be returned, and the data you attempted to send will be included in the error. - If the packet buffer is full, a
SendError::Socket(SocketError::InternalChannelFull)
will be returned, and the data you attempted to send will be included in the error. See theSocketIoBuilder::max_buffer_size
option for more information on internal buffer configuration.
§Single-socket example
fn handler(socket: SocketRef, Data(data): Data::<Value>) {
// Emit a test message to the client
socket.emit("test", &data).ok();
// Emit a test message with multiple arguments to the client
socket.emit("test", &("world", "hello", 1)).ok();
// Emit a test message with an array as the first argument
let arr = [1, 2, 3, 4];
socket.emit("test", &[arr]).ok();
}
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| socket.on("test", handler));
§Single-socket binary example with the bytes
crate
fn handler(socket: SocketRef, Data(data): Data::<(String, Bytes, Bytes)>) {
// Emit a test message to the client
socket.emit("test", &data).ok();
// Emit a test message with multiple arguments to the client
socket.emit("test", &("world", "hello", Bytes::from_static(&[1, 2, 3, 4]))).ok();
// Emit a test message with an array as the first argument
let arr = [1, 2, 3, 4];
socket.emit("test", &[arr]).ok();
}
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| socket.on("test", handler));
§Broadcast example
Here the emit method will return a Future
that must be awaited because socket.io may communicate
with remote instances if you use horizontal scaling through remote adapters.
async fn handler(socket: SocketRef, Data(data): Data::<(String, Bytes, Bytes)>) {
// Emit a test message in the room1 and room3 rooms, except for room2, with the received binary payload
socket.to("room1").to("room3").except("room2").emit("test", &data).await;
// Emit a test message with multiple arguments to the client
socket.to("room1").emit("test", &("world", "hello", 1)).await;
// Emit a test message with an array as the first argument
let arr = [1, 2, 3, 4];
socket.to("room2").emit("test", &[arr]).await;
}
let (_, io) = SocketIo::new_svc();
io.ns("/", |s: SocketRef| s.on("test", handler));
Sourcepub fn emit_with_ack<T: ?Sized + Serialize, V>(
self,
event: impl AsRef<str>,
data: &T,
) -> impl Future<Output = Result<AckStream<V, A>, EmitWithAckError>> + Send
pub fn emit_with_ack<T: ?Sized + Serialize, V>( self, event: impl AsRef<str>, data: &T, ) -> impl Future<Output = Result<AckStream<V, A>, EmitWithAckError>> + Send
§Emit a message to one or many clients and wait for one or more acknowledgments.
See emit()
for more details on emitting messages.
The acknowledgment has a timeout specified in the config (5 seconds by default)
(see SocketIoBuilder::ack_timeout
) or can be set with the timeout()
operator.
To receive acknowledgments, an AckStream
is returned. It can be used in two ways:
- As a
Stream
: This will yield all the acknowledgment responses, along with the corresponding socket ID, received from the client. This is useful when broadcasting to multiple sockets and expecting more than one acknowledgment. To get the socket from this ID, useio::get_socket()
. - As a
Future
: This will yield the first acknowledgment response received from the client, useful when expecting only one acknowledgment.
§Errors
If packet encoding fails, an ParserError
is immediately returned.
If the socket is full or if it is closed before receiving the acknowledgment,
a SendError::Socket
will be immediately returned, and the value to send will be given back.
If the client does not respond before the timeout, the AckStream
will yield
an AckError::Timeout
. If the data sent by the client is not deserializable as V
,
an AckError::Decode
will be yielded.
§Single-socket example
async fn handler(socket: SocketRef, Data(data): Data::<Value>) {
// Emit a test message and wait for an acknowledgment with the timeout specified in the global config
match socket.emit_with_ack::<_, Value>("test", &data).unwrap().await {
Ok(ack) => println!("Ack received {:?}", ack),
Err(err) => println!("Ack error {:?}", err),
}
}
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| socket.on("test", handler));
§Single-socket example with custom acknowledgment timeout
async fn handler(socket: SocketRef, Data(data): Data::<Value>) {
// Emit a test message and wait for an acknowledgment with the timeout specified here
match socket.timeout(Duration::from_millis(2)).emit_with_ack::<_, Value>("test", &data).unwrap().await {
Ok(ack) => println!("Ack received {:?}", ack),
Err(err) => println!("Ack error {:?}", err),
}
}
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| socket.on("test", handler));
§Broadcast example
Here the emit method will return a Future
that must be awaited because socket.io may communicate
with remote instances if you use horizontal scaling through remote adapters.
async fn handler(socket: SocketRef, Data(data): Data::<Value>) {
// Emit a test message in the room1 and room3 rooms,
// except for room2, with the binary payload received
let ack_stream = socket.to("room1")
.to("room3")
.except("room2")
.emit_with_ack::<_, String>("message-back", &data)
.await
.unwrap();
ack_stream.for_each(async |(id, ack)| {
match ack {
Ok(ack) => println!("Ack received, socket {} {:?}", id, ack),
Err(err) => println!("Ack error, socket {} {:?}", id, err),
}
}).await;
}
let (_, io) = SocketIo::new_svc();
io.ns("/", |s: SocketRef| s.on("test", handler));
Sourcepub fn sockets(self) -> Vec<SocketRef<A>>
pub fn sockets(self) -> Vec<SocketRef<A>>
§Get all the local sockets selected with the previous operators.
This can be used to retrieve any extension data (with the extensions
feature enabled) from the sockets or to make certain sockets join other rooms.
fetch_sockets
to get remote sockets.
§Example
async fn handler(socket: SocketRef) {
// Find extension data in each socket in the room1 and room3 rooms, except for room2
let sockets = socket.within("room1").within("room3").except("room2").sockets();
for socket in sockets {
println!("Socket extension: {:?}", socket.extensions.get::<String>());
}
}
let (_, io) = SocketIo::new_svc();
io.ns("/", |s: SocketRef| s.on("test", handler));
Sourcepub async fn fetch_sockets(self) -> Result<Vec<RemoteSocket<A>>, A::Error>
pub async fn fetch_sockets(self) -> Result<Vec<RemoteSocket<A>>, A::Error>
§Get all the local and remote sockets selected with the previous operators.
sockets()
if you only have a single node.
Avoid using this method if you want to immediately perform actions on the sockets. Instead, directly apply the actions using operators:
§Correct Approach
io.within("room1").emit("foo", "bar").await.unwrap();
io.within("room1").disconnect().await.unwrap();
§Incorrect Approach
let sockets = io.within("room1").fetch_sockets().await.unwrap();
for socket in sockets {
socket.emit("test", &"Hello").await.unwrap();
socket.leave("room1").await.unwrap();
}
§Example
let (_, io) = SocketIo::new_svc();
let sockets = io.within("room1").fetch_sockets().await.unwrap();
for socket in sockets {
println!("Socket ID: {:?}", socket.data().id);
}
Sourcepub async fn disconnect(self) -> Result<(), BroadcastError>
pub async fn disconnect(self) -> Result<(), BroadcastError>
§Disconnect all sockets selected with the previous operators.
This will return a Future
that must be awaited because socket.io may communicate with remote instances
if you use horizontal scaling through remote adapters.
§Example from a socket
async fn handler(socket: SocketRef) {
// Disconnect all sockets in the room1 and room3 rooms, except for room2.
socket.within("room1").within("room3").except("room2").disconnect().await.unwrap();
}
let (_, io) = SocketIo::new_svc();
io.ns("/", |s: SocketRef| s.on("test", handler));
§Example from the io struct
async fn handler(socket: SocketRef, io: SocketIo) {
// Disconnect all sockets in the room1 and room3 rooms, except for room2.
io.within("room1").within("room3").except("room2").disconnect().await.unwrap();
}
let (_, io) = SocketIo::new_svc();
io.ns("/", |s: SocketRef| s.on("test", handler));
Sourcepub fn join(
self,
rooms: impl RoomParam,
) -> impl Future<Output = Result<(), A::Error>> + Send
pub fn join( self, rooms: impl RoomParam, ) -> impl Future<Output = Result<(), A::Error>> + Send
§Add all sockets selected with the previous operators to the specified room(s).
This will return a Future
that must be awaited because socket.io may communicate with remote instances
if you use horizontal scaling through remote adapters.
§Example
async fn handler(socket: SocketRef) {
// Add all sockets that are in room1 and room3 to room4 and room5
socket.within("room1").within("room3").join(["room4", "room5"]).await.unwrap();
// We should retrieve all the local sockets that are in room3 and room5
let sockets = socket.within("room4").within("room5").sockets();
}
let (_, io) = SocketIo::new_svc();
io.ns("/", |s: SocketRef| s.on("test", handler));
Sourcepub fn leave(
self,
rooms: impl RoomParam,
) -> impl Future<Output = Result<(), A::Error>> + Send
pub fn leave( self, rooms: impl RoomParam, ) -> impl Future<Output = Result<(), A::Error>> + Send
§Remove all sockets selected with the previous operators from the specified room(s).
This will return a Future
that must be awaited because socket.io may communicate with remote instances
if you use horizontal scaling through remote adapters.
§Example
async fn handler(socket: SocketRef) {
// Remove all sockets that are in room1 and room3 from room4 and room5
socket.within("room1").within("room3").leave(["room4", "room5"]).await.unwrap();
}
let (_, io) = SocketIo::new_svc();
io.ns("/", |s: SocketRef| s.on("test", handler));
Sourcepub async fn rooms(self) -> Result<Vec<Room>, A::Error>
pub async fn rooms(self) -> Result<Vec<Room>, A::Error>
§Get all the rooms selected with the previous operators.
This will return a Future
that must be awaited because socket.io may communicate with remote instances
if you use horizontal scaling through remote adapters.
§Example
async fn handler(socket: SocketRef, io: SocketIo) {
println!("Socket connected to the / namespace with id: {}", socket.id);
let rooms = io.rooms().await.unwrap();
println!("All rooms in the / namespace: {:?}", rooms);
}
let (_, io) = SocketIo::new_svc();
io.ns("/", handler);