Struct socketioxide::operators::BroadcastOperators
source · pub struct BroadcastOperators<A: Adapter = LocalAdapter> { /* private fields */ }Expand description
Chainable operators to select sockets to send a message to and to configure the message to be sent.
Implementations§
source§impl<A: Adapter> BroadcastOperators<A>
impl<A: Adapter> BroadcastOperators<A>
sourcepub fn to(self, rooms: impl RoomParam) -> Self
pub fn to(self, rooms: impl RoomParam) -> Self
Selects all sockets in the given rooms except the current socket.
If it is called from the Namespace level there will be no difference with the within() operator
If you want to include the current socket, use the within() operator.
§Example
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| {
socket.on("test", |socket: SocketRef, Data::<Value>(data)| async move {
let other_rooms = "room4".to_string();
// In room1, room2, room3 and room4 except the current
socket
.to("room1")
.to(["room2", "room3"])
.to(vec![other_rooms])
.emit("test", data);
});
});sourcepub fn within(self, rooms: impl RoomParam) -> Self
pub fn within(self, rooms: impl RoomParam) -> Self
Selects all sockets in the given rooms.
It does include the current socket contrary to the to() operator.
If it is called from the Namespace level there will be no difference with the to() operator
§Example
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| {
socket.on("test", |socket: SocketRef, Data::<Value>(data)| async move {
let other_rooms = "room4".to_string();
// In room1, room2, room3 and room4 including the current socket
socket
.within("room1")
.within(["room2", "room3"])
.within(vec![other_rooms])
.emit("test", data);
});
});sourcepub fn except(self, rooms: impl RoomParam) -> Self
pub fn except(self, rooms: impl RoomParam) -> Self
Filters out all sockets selected with the previous operators which are in the given rooms.
§Example
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| {
socket.on("register1", |socket: SocketRef, Data::<Value>(data)| async move {
socket.join("room1");
});
socket.on("register2", |socket: SocketRef, Data::<Value>(data)| async move {
socket.join("room2");
});
socket.on("test", |socket: SocketRef, Data::<Value>(data)| async move {
// This message will be broadcast to all sockets in the Namespace
// except for ones in room1 and the current socket
socket.broadcast().except("room1").emit("test", data);
});
});sourcepub fn local(self) -> Self
pub fn local(self) -> Self
Broadcasts to all sockets only connected on this node (when using multiple nodes). When using the default in-memory adapter, this operator is a no-op.
§Example
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| {
socket.on("test", |socket: SocketRef, Data::<Value>(data)| async move {
// This message will be broadcast to all sockets in this namespace and connected on this node
socket.local().emit("test", data);
});
});sourcepub fn broadcast(self) -> Self
pub fn broadcast(self) -> Self
Broadcasts to all sockets without any filtering (except the current socket).
§Example
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| {
socket.on("test", |socket: SocketRef, Data::<Value>(data)| async move {
// This message will be broadcast to all sockets in this namespace
socket.broadcast().emit("test", data);
});
});sourcepub fn timeout(self, timeout: Duration) -> Self
pub fn timeout(self, timeout: Duration) -> Self
Sets a custom timeout when sending a message with an acknowledgement.
See SocketIoBuilder::ack_timeout for the default timeout.
See emit_with_ack() for more details on acknowledgements.
§Example
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| {
socket.on("test", |socket: SocketRef, Data::<Value>(data), Bin(bin)| async move {
// Emit a test message in the room1 and room3 rooms, except for the room2
// room with the binary payload received, wait for 5 seconds for an acknowledgement
socket.to("room1")
.to("room3")
.except("room2")
.bin(bin)
.timeout(Duration::from_secs(5))
.emit_with_ack::<Value>("message-back", data)
.unwrap()
.for_each(|(id, ack)| async move {
match ack {
Ok(ack) => println!("Ack received, socket {} {:?}", id, ack),
Err(err) => println!("Ack error, socket {} {:?}", id, err),
}
}).await;
});
});sourcepub fn bin(self, binary: Vec<Vec<u8>>) -> Self
pub fn bin(self, binary: Vec<Vec<u8>>) -> Self
Adds a binary payload to the message.
§Example
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| {
socket.on("test", |socket: SocketRef, Data::<Value>(data), Bin(bin)| async move {
// This will send the binary payload received to all sockets in this namespace with the test message
socket.bin(bin).emit("test", data);
});
});source§impl<A: Adapter> BroadcastOperators<A>
impl<A: Adapter> BroadcastOperators<A>
sourcepub fn emit<T: Serialize>(
self,
event: impl Into<Cow<'static, str>>,
data: T
) -> Result<(), BroadcastError>
pub fn emit<T: Serialize>( self, event: impl Into<Cow<'static, str>>, data: T ) -> Result<(), BroadcastError>
Emits a message to all sockets selected with the previous operators.
If you provide array-like data (tuple, vec, arrays), it will be considered as multiple arguments. Therefore if you want to send an array as the first argument of the payload, you need to wrap it in an array or a tuple.
§Errors
- When encoding the data into JSON a
BroadcastError::Serializemay be returned. - If the underlying engine.io connection is closed for a given socket a
BroadcastError::Socket(SocketError::Closed)will be returned. - If the packet buffer is full for a given socket, a
BroadcastError::Socket(SocketError::InternalChannelFull)will be retured. SeeSocketIoBuilder::max_buffer_sizeoption for more infos on internal buffer config
Note: If a error is returned because of a specific socket, the message will still be sent to all other sockets.
§Example
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| {
socket.on("test", |socket: SocketRef, Data::<Value>(data), Bin(bin)| async move {
// Emit a test message in the room1 and room3 rooms, except for the room2 room with the binary payload received
socket.to("room1").to("room3").except("room2").bin(bin).emit("test", data);
// Emit a test message with multiple arguments to the client
socket.to("room1").emit("test", ("world", "hello", 1)).ok();
// Emit a test message with an array as the first argument
let arr = [1, 2, 3, 4];
socket.to("room2").emit("test", [arr]).ok();
});
});sourcepub fn emit_with_ack<V>(
self,
event: impl Into<Cow<'static, str>>,
data: impl Serialize
) -> Result<AckStream<V>, Error>
pub fn emit_with_ack<V>( self, event: impl Into<Cow<'static, str>>, data: impl Serialize ) -> Result<AckStream<V>, Error>
Emits a message to all sockets selected with the previous operators and waits for the acknowledgement(s).
The acknowledgement has a timeout specified in the config (5s by default)
(see SocketIoBuilder::ack_timeout) or with the timeout() operator.
To get acknowledgements, an AckStream is returned.
It can be used in two ways:
- As a
Stream: It will yield all theAckResponsewith their corresponding socket id received from the client. It can useful when broadcasting to multiple sockets and therefore expecting more than one acknowledgement. If you want to get the socket from this id, useio::get_socket(). - As a
Future: It will yield the firstAckResponsereceived from the client. Useful when expecting only one acknowledgement.
If the packet encoding failed a serde_json::Error is immediately returned.
If the socket is full or if it has been closed before receiving the acknowledgement,
an AckError::Socket will be yielded.
If the client didn’t respond before the timeout, the AckStream will yield
an AckError::Timeout. If the data sent by the client is not deserializable as V,
an AckError::Serde will be yielded.
§Example
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| {
socket.on("test", |socket: SocketRef, Data::<Value>(data), Bin(bin)| async move {
// Emit a test message in the room1 and room3 rooms,
// except for the room2 room with the binary payload received
let ack_stream = socket.to("room1")
.to("room3")
.except("room2")
.bin(bin)
.emit_with_ack::<String>("message-back", data)
.unwrap();
ack_stream.for_each(|(id, ack)| async move {
match ack {
Ok(ack) => println!("Ack received, socket {} {:?}", id, ack),
Err(err) => println!("Ack error, socket {} {:?}", id, err),
}
}).await;
});
});sourcepub fn sockets(self) -> Result<Vec<SocketRef<A>>, A::Error>
pub fn sockets(self) -> Result<Vec<SocketRef<A>>, A::Error>
Gets all sockets selected with the previous operators.
It can be used to retrieve any extension data (with the extensions feature enabled) from the sockets or to make some sockets join other rooms.
§Example
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| {
socket.on("test", |socket: SocketRef| async move {
// Find an extension data in each sockets in the room1 and room3 rooms, except for the room2
let sockets = socket.within("room1").within("room3").except("room2").sockets().unwrap();
for socket in sockets {
println!("Socket custom string: {:?}", socket.extensions.get::<String>());
}
});
});sourcepub fn disconnect(self) -> Result<(), Vec<DisconnectError>>
pub fn disconnect(self) -> Result<(), Vec<DisconnectError>>
Disconnects all sockets selected with the previous operators.
§Example
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| {
socket.on("test", |socket: SocketRef| async move {
// Disconnect all sockets in the room1 and room3 rooms, except for the room2
socket.within("room1").within("room3").except("room2").disconnect().unwrap();
});
});sourcepub fn join(self, rooms: impl RoomParam) -> Result<(), A::Error>
pub fn join(self, rooms: impl RoomParam) -> Result<(), A::Error>
Makes all sockets selected with the previous operators join the given room(s).
§Example
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| {
socket.on("test", |socket: SocketRef| async move {
// Add all sockets that are in the room1 and room3 to the room4 and room5
socket.within("room1").within("room3").join(["room4", "room5"]).unwrap();
});
});sourcepub fn leave(self, rooms: impl RoomParam) -> Result<(), A::Error>
pub fn leave(self, rooms: impl RoomParam) -> Result<(), A::Error>
Makes all sockets selected with the previous operators leave the given room(s).
§Example
let (_, io) = SocketIo::new_svc();
io.ns("/", |socket: SocketRef| {
socket.on("test", |socket: SocketRef| async move {
// Remove all sockets that are in the room1 and room3 from the room4 and room5
socket.within("room1").within("room3").leave(["room4", "room5"]).unwrap();
});
});