pub enum Message {
Text(Utf8Bytes),
Binary(Bytes),
Ping(Bytes),
Pong(Bytes),
Close(Option<CloseFrame>),
Frame(Frame),
}Expand description
An enum representing the various forms of a WebSocket message.
Variants§
Text(Utf8Bytes)
A text WebSocket message
Binary(Bytes)
A binary WebSocket message
Ping(Bytes)
A ping message with the specified payload
The payload here must have a length less than 125 bytes
Pong(Bytes)
A pong message with the specified payload
The payload here must have a length less than 125 bytes
Close(Option<CloseFrame>)
A close message with the optional close frame.
Frame(Frame)
Raw frame. Note, that you’re not going to get this value while reading the message.
Implementations§
Source§impl Message
impl Message
Sourcepub fn text<S>(string: S) -> Message
pub fn text<S>(string: S) -> Message
Create a new text WebSocket message from a stringable.
Examples found in repository?
examples/echo_client.rs (line 91)
87 async fn run(&mut self) -> CoreResult<()> {
88 loop {
89 tokio::select! {
90 Ok(cmd) = self.cmd_rx.recv() => {
91 let _ = self.to_ws.send(Message::text(cmd)).await;
92 self.echo.store(true, Ordering::SeqCst);
93 }
94 Ok(msg) = self.msg_rx.recv() => {
95 if let Message::Text(txt) = &*msg && self.echo.load(Ordering::SeqCst) {
96 let _ = self.cmd_tx.send(txt.to_string()).await;
97 self.echo.store(false, Ordering::SeqCst);
98 }
99 }
100 }
101 }
102 }
103
104 fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
105 Box::new(move |msg: &Message| msg.is_text())
106 }
107}
108
109#[derive(Clone)]
110pub struct EchoHandle {
111 sender: AsyncSender<String>,
112 receiver: AsyncReceiver<String>,
113}
114
115impl EchoHandle {
116 pub async fn echo(&self, msg: String) -> CoreResult<String> {
117 let _ = self.sender.send(msg).await;
118 println!("In side echo handle, waiting for response...");
119 Ok(self.receiver.recv().await?)
120 }
121}
122
123// --- ApiModule 2: StreamModule ---
124pub struct StreamModule {
125 msg_rx: AsyncReceiver<Arc<Message>>,
126 cmd_rx: AsyncReceiver<bool>,
127 cmd_tx: AsyncSender<String>,
128 send: AtomicBool,
129}
130
131#[async_trait]
132impl ApiModule<()> for StreamModule {
133 type Command = bool;
134 type CommandResponse = String;
135 type Handle = StreamHandle;
136
137 fn new(
138 _state: Arc<()>,
139 cmd_rx: AsyncReceiver<Self::Command>,
140 cmd_ret_tx: AsyncSender<Self::CommandResponse>,
141 msg_rx: AsyncReceiver<Arc<Message>>,
142 _to_ws: AsyncSender<Message>,
143 ) -> Self {
144 Self {
145 msg_rx,
146 cmd_tx: cmd_ret_tx,
147 cmd_rx,
148 send: AtomicBool::new(false),
149 }
150 }
151
152 fn create_handle(
153 sender: AsyncSender<Self::Command>,
154 receiver: AsyncReceiver<Self::CommandResponse>,
155 ) -> Self::Handle {
156 StreamHandle { sender, receiver }
157 }
158
159 async fn run(&mut self) -> CoreResult<()> {
160 loop {
161 tokio::select! {
162 Ok(cmd) = self.cmd_rx.recv() => {
163 // Update the send flag based on the received command
164 self.send.store(cmd, Ordering::SeqCst);
165 }
166 Ok(msg) = self.msg_rx.recv() => {
167 if let Message::Text(txt) = &*msg
168 && self.send.load(Ordering::SeqCst) {
169 // Process the message if send is true
170 println!("[StreamModule] Received: {txt}");
171 let _ = self.cmd_tx.send(txt.to_string()).await;
172 }
173 }
174 else => {
175 println!("[Error] StreamModule: Channel closed");
176 },
177 }
178 }
179 }
180
181 fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
182 Box::new(move |_msg: &Message| {
183 // Accept all messages
184 true
185 })
186 }
187}
188
189#[derive(Clone)]
190pub struct StreamHandle {
191 receiver: AsyncReceiver<String>,
192 sender: AsyncSender<bool>,
193}
194
195impl StreamHandle {
196 pub async fn stream(self) -> CoreResult<impl Stream<Item = CoreResult<String>>> {
197 self.sender.send(true).await?;
198 println!("StreamHandle: Waiting for messages...");
199 Ok(Box::pin(unfold(self.receiver, |state| async move {
200 let item = state.recv().await.map_err(CoreError::from);
201 Some((item, state))
202 })))
203 }
204}
205
206// --- ApiModule 3: PeriodicSenderModule ---
207pub struct PeriodicSenderModule {
208 cmd_rx: AsyncReceiver<bool>,
209 to_ws: AsyncSender<Message>,
210 running: AtomicBool,
211}
212
213#[async_trait]
214impl ApiModule<()> for PeriodicSenderModule {
215 type Command = bool; // true = start, false = stop
216 type CommandResponse = ();
217 type Handle = PeriodicSenderHandle;
218
219 fn new(
220 _state: Arc<()>,
221 cmd_rx: AsyncReceiver<Self::Command>,
222 _cmd_ret_tx: AsyncSender<Self::CommandResponse>,
223 _msg_rx: AsyncReceiver<Arc<Message>>,
224 to_ws: AsyncSender<Message>,
225 ) -> Self {
226 Self {
227 cmd_rx,
228 to_ws,
229 running: AtomicBool::new(false),
230 }
231 }
232
233 fn create_handle(
234 sender: AsyncSender<Self::Command>,
235 _receiver: AsyncReceiver<Self::CommandResponse>,
236 ) -> Self::Handle {
237 PeriodicSenderHandle { sender }
238 }
239
240 async fn run(&mut self) -> CoreResult<()> {
241 let to_ws = self.to_ws.clone();
242 let mut interval = tokio::time::interval(Duration::from_secs(5));
243 loop {
244 tokio::select! {
245 Ok(cmd) = self.cmd_rx.recv() => {
246 self.running.store(cmd, Ordering::SeqCst);
247 }
248 _ = interval.tick() => {
249 if self.running.load(Ordering::SeqCst) {
250 let _ = to_ws.send(Message::text("Ping from periodic sender")).await;
251 }
252 }
253 }
254 }
255 }More examples
examples/testing_echo_client.rs (line 81)
77 async fn run(&mut self) -> CoreResult<()> {
78 loop {
79 tokio::select! {
80 Ok(cmd) = self.cmd_rx.recv() => {
81 let _ = self.to_ws.send(Message::text(cmd)).await;
82 self.echo.store(true, Ordering::SeqCst);
83 }
84 Ok(msg) = self.msg_rx.recv() => {
85 if let Message::Text(txt) = &*msg && self.echo.load(Ordering::SeqCst) {
86 let _ = self.cmd_tx.send(txt.to_string()).await;
87 self.echo.store(false, Ordering::SeqCst);
88 }
89 }
90 }
91 }
92 }Sourcepub fn binary<B>(bin: B) -> Message
pub fn binary<B>(bin: B) -> Message
Create a new binary WebSocket message by converting to Bytes.
Sourcepub fn is_text(&self) -> bool
pub fn is_text(&self) -> bool
Indicates whether a message is a text message.
Examples found in repository?
More examples
Sourcepub fn is_empty(&self) -> bool
pub fn is_empty(&self) -> bool
Returns true if the WebSocket message has no content. For example, if the other side of the connection sent an empty string.
Trait Implementations§
Source§impl<T> Sink<Message> for WebSocketStream<T>
impl<T> Sink<Message> for WebSocketStream<T>
Source§fn poll_ready(
self: Pin<&mut WebSocketStream<T>>,
cx: &mut Context<'_>,
) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>
fn poll_ready( self: Pin<&mut WebSocketStream<T>>, cx: &mut Context<'_>, ) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>
Attempts to prepare the
Sink to receive a value. Read moreSource§fn start_send(
self: Pin<&mut WebSocketStream<T>>,
item: Message,
) -> Result<(), <WebSocketStream<T> as Sink<Message>>::Error>
fn start_send( self: Pin<&mut WebSocketStream<T>>, item: Message, ) -> Result<(), <WebSocketStream<T> as Sink<Message>>::Error>
Begin the process of sending a value to the sink.
Each call to this function must be preceded by a successful call to
poll_ready which returned Poll::Ready(Ok(())). Read moreSource§fn poll_flush(
self: Pin<&mut WebSocketStream<T>>,
cx: &mut Context<'_>,
) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>
fn poll_flush( self: Pin<&mut WebSocketStream<T>>, cx: &mut Context<'_>, ) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>
Flush any remaining output from this sink. Read more
Source§fn poll_close(
self: Pin<&mut WebSocketStream<T>>,
cx: &mut Context<'_>,
) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>
fn poll_close( self: Pin<&mut WebSocketStream<T>>, cx: &mut Context<'_>, ) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>
Flush any remaining output and close this sink, if necessary. Read more
impl Eq for Message
impl StructuralPartialEq for Message
Auto Trait Implementations§
impl !Freeze for Message
impl RefUnwindSafe for Message
impl Send for Message
impl Sync for Message
impl Unpin for Message
impl UnwindSafe for Message
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more