pub enum Message {
Text(Utf8Bytes),
Binary(Bytes),
Ping(Bytes),
Pong(Bytes),
Close(Option<CloseFrame>),
Frame(Frame),
}Expand description
An enum representing the various forms of a WebSocket message.
Variants§
Text(Utf8Bytes)
A text WebSocket message
Binary(Bytes)
A binary WebSocket message
Ping(Bytes)
A ping message with the specified payload
The payload here must have a length less than 125 bytes
Pong(Bytes)
A pong message with the specified payload
The payload here must have a length less than 125 bytes
Close(Option<CloseFrame>)
A close message with the optional close frame.
Frame(Frame)
Raw frame. Note, that you’re not going to get this value while reading the message.
Implementations§
Source§impl Message
impl Message
Sourcepub fn text<S>(string: S) -> Message
pub fn text<S>(string: S) -> Message
Create a new text WebSocket message from a stringable.
Examples found in repository?
examples/echo_client.rs (line 92)
88 async fn run(&mut self) -> CoreResult<()> {
89 loop {
90 tokio::select! {
91 Ok(cmd) = self.cmd_rx.recv() => {
92 let _ = self.to_ws.send(Message::text(cmd)).await;
93 self.echo.store(true, Ordering::SeqCst);
94 }
95 Ok(msg) = self.msg_rx.recv() => {
96 if let Message::Text(txt) = &*msg {
97 if self.echo.load(Ordering::SeqCst) {
98 let _ = self.cmd_tx.send(txt.to_string()).await;
99 self.echo.store(false, Ordering::SeqCst);
100 }
101 }
102 }
103 }
104 }
105 }
106
107 fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
108 Box::new(move |msg: &Message| msg.is_text())
109 }
110}
111
112#[derive(Clone)]
113pub struct EchoHandle {
114 sender: AsyncSender<String>,
115 receiver: AsyncReceiver<String>,
116}
117
118impl EchoHandle {
119 pub async fn echo(&self, msg: String) -> CoreResult<String> {
120 let _ = self.sender.send(msg).await;
121 println!("In side echo handle, waiting for response...");
122 Ok(self.receiver.recv().await?)
123 }
124}
125
126// --- ApiModule 2: StreamModule ---
127pub struct StreamModule {
128 msg_rx: AsyncReceiver<Arc<Message>>,
129 cmd_rx: AsyncReceiver<bool>,
130 cmd_tx: AsyncSender<String>,
131 send: AtomicBool,
132}
133
134#[async_trait]
135impl ApiModule<()> for StreamModule {
136 type Command = bool;
137 type CommandResponse = String;
138 type Handle = StreamHandle;
139
140 fn new(
141 _state: Arc<()>,
142 cmd_rx: AsyncReceiver<Self::Command>,
143 cmd_ret_tx: AsyncSender<Self::CommandResponse>,
144 msg_rx: AsyncReceiver<Arc<Message>>,
145 _to_ws: AsyncSender<Message>,
146 _: AsyncSender<RunnerCommand>,
147 ) -> Self {
148 Self {
149 msg_rx,
150 cmd_tx: cmd_ret_tx,
151 cmd_rx,
152 send: AtomicBool::new(false),
153 }
154 }
155
156 fn create_handle(
157 sender: AsyncSender<Self::Command>,
158 receiver: AsyncReceiver<Self::CommandResponse>,
159 ) -> Self::Handle {
160 StreamHandle { sender, receiver }
161 }
162
163 async fn run(&mut self) -> CoreResult<()> {
164 loop {
165 tokio::select! {
166 Ok(cmd) = self.cmd_rx.recv() => {
167 // Update the send flag based on the received command
168 self.send.store(cmd, Ordering::SeqCst);
169 }
170 Ok(msg) = self.msg_rx.recv() => {
171 if let Message::Text(txt) = &*msg {
172 if self.send.load(Ordering::SeqCst) {
173 // Process the message if send is true
174 println!("[StreamModule] Received: {txt}");
175 let _ = self.cmd_tx.send(txt.to_string()).await;
176 }
177 }
178 }
179 else => {
180 println!("[Error] StreamModule: Channel closed");
181 },
182 }
183 }
184 }
185
186 fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
187 Box::new(move |_msg: &Message| {
188 // Accept all messages
189 true
190 })
191 }
192}
193
194#[derive(Clone)]
195pub struct StreamHandle {
196 receiver: AsyncReceiver<String>,
197 sender: AsyncSender<bool>,
198}
199
200impl StreamHandle {
201 pub async fn stream(self) -> CoreResult<impl Stream<Item = CoreResult<String>>> {
202 self.sender.send(true).await?;
203 println!("StreamHandle: Waiting for messages...");
204 Ok(Box::pin(unfold(self.receiver, |state| async move {
205 let item = state.recv().await.map_err(CoreError::from);
206 Some((item, state))
207 })))
208 }
209}
210
211// --- ApiModule 3: PeriodicSenderModule ---
212pub struct PeriodicSenderModule {
213 cmd_rx: AsyncReceiver<bool>,
214 to_ws: AsyncSender<Message>,
215 running: AtomicBool,
216}
217
218#[async_trait]
219impl ApiModule<()> for PeriodicSenderModule {
220 type Command = bool; // true = start, false = stop
221 type CommandResponse = ();
222 type Handle = PeriodicSenderHandle;
223
224 fn new(
225 _state: Arc<()>,
226 cmd_rx: AsyncReceiver<Self::Command>,
227 _cmd_ret_tx: AsyncSender<Self::CommandResponse>,
228 _msg_rx: AsyncReceiver<Arc<Message>>,
229 to_ws: AsyncSender<Message>,
230 _: AsyncSender<RunnerCommand>,
231 ) -> Self {
232 Self {
233 cmd_rx,
234 to_ws,
235 running: AtomicBool::new(false),
236 }
237 }
238
239 fn create_handle(
240 sender: AsyncSender<Self::Command>,
241 _receiver: AsyncReceiver<Self::CommandResponse>,
242 ) -> Self::Handle {
243 PeriodicSenderHandle { sender }
244 }
245
246 async fn run(&mut self) -> CoreResult<()> {
247 let to_ws = self.to_ws.clone();
248 let mut interval = tokio::time::interval(Duration::from_secs(5));
249 loop {
250 tokio::select! {
251 Ok(cmd) = self.cmd_rx.recv() => {
252 self.running.store(cmd, Ordering::SeqCst);
253 }
254 _ = interval.tick() => {
255 if self.running.load(Ordering::SeqCst) {
256 let _ = to_ws.send(Message::text("Ping from periodic sender")).await;
257 }
258 }
259 }
260 }
261 }More examples
examples/testing_echo_client.rs (line 82)
78 async fn run(&mut self) -> CoreResult<()> {
79 loop {
80 tokio::select! {
81 Ok(cmd) = self.cmd_rx.recv() => {
82 let _ = self.to_ws.send(Message::text(cmd)).await;
83 self.echo.store(true, Ordering::SeqCst);
84 }
85 Ok(msg) = self.msg_rx.recv() => {
86 if let Message::Text(txt) = &*msg {
87 if self.echo.load(Ordering::SeqCst) {
88 let _ = self.cmd_tx.send(txt.to_string()).await;
89 self.echo.store(false, Ordering::SeqCst);
90 }
91 }
92 }
93 }
94 }
95 }Sourcepub fn binary<B>(bin: B) -> Message
pub fn binary<B>(bin: B) -> Message
Create a new binary WebSocket message by converting to Bytes.
Sourcepub fn is_text(&self) -> bool
pub fn is_text(&self) -> bool
Indicates whether a message is a text message.
Examples found in repository?
More examples
Sourcepub fn is_empty(&self) -> bool
pub fn is_empty(&self) -> bool
Returns true if the WebSocket message has no content. For example, if the other side of the connection sent an empty string.
Trait Implementations§
Source§impl<T> Sink<Message> for WebSocketStream<T>
impl<T> Sink<Message> for WebSocketStream<T>
Source§fn poll_ready(
self: Pin<&mut WebSocketStream<T>>,
cx: &mut Context<'_>,
) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>
fn poll_ready( self: Pin<&mut WebSocketStream<T>>, cx: &mut Context<'_>, ) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>
Attempts to prepare the
Sink to receive a value. Read moreSource§fn start_send(
self: Pin<&mut WebSocketStream<T>>,
item: Message,
) -> Result<(), <WebSocketStream<T> as Sink<Message>>::Error>
fn start_send( self: Pin<&mut WebSocketStream<T>>, item: Message, ) -> Result<(), <WebSocketStream<T> as Sink<Message>>::Error>
Begin the process of sending a value to the sink.
Each call to this function must be preceded by a successful call to
poll_ready which returned Poll::Ready(Ok(())). Read moreSource§fn poll_flush(
self: Pin<&mut WebSocketStream<T>>,
cx: &mut Context<'_>,
) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>
fn poll_flush( self: Pin<&mut WebSocketStream<T>>, cx: &mut Context<'_>, ) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>
Flush any remaining output from this sink. Read more
Source§fn poll_close(
self: Pin<&mut WebSocketStream<T>>,
cx: &mut Context<'_>,
) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>
fn poll_close( self: Pin<&mut WebSocketStream<T>>, cx: &mut Context<'_>, ) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>
Flush any remaining output and close this sink, if necessary. Read more
impl Eq for Message
impl StructuralPartialEq for Message
Auto Trait Implementations§
impl !Freeze for Message
impl RefUnwindSafe for Message
impl Send for Message
impl Sync for Message
impl Unpin for Message
impl UnsafeUnpin for Message
impl UnwindSafe for Message
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more