Skip to main content

Message

Enum Message 

Source
pub enum Message {
    Text(Utf8Bytes),
    Binary(Bytes),
    Ping(Bytes),
    Pong(Bytes),
    Close(Option<CloseFrame>),
    Frame(Frame),
}
Expand description

An enum representing the various forms of a WebSocket message.

Variants§

§

Text(Utf8Bytes)

A text WebSocket message

§

Binary(Bytes)

A binary WebSocket message

§

Ping(Bytes)

A ping message with the specified payload

The payload here must have a length less than 125 bytes

§

Pong(Bytes)

A pong message with the specified payload

The payload here must have a length less than 125 bytes

§

Close(Option<CloseFrame>)

A close message with the optional close frame.

§

Frame(Frame)

Raw frame. Note, that you’re not going to get this value while reading the message.

Implementations§

Source§

impl Message

Source

pub fn text<S>(string: S) -> Message
where S: Into<Utf8Bytes>,

Create a new text WebSocket message from a stringable.

Examples found in repository?
examples/echo_client.rs (line 92)
88    async fn run(&mut self) -> CoreResult<()> {
89        loop {
90            tokio::select! {
91                Ok(cmd) = self.cmd_rx.recv() => {
92                    let _ = self.to_ws.send(Message::text(cmd)).await;
93                    self.echo.store(true, Ordering::SeqCst);
94                }
95                Ok(msg) = self.msg_rx.recv() => {
96                    if let Message::Text(txt) = &*msg {
97                        if self.echo.load(Ordering::SeqCst) {
98                            let _ = self.cmd_tx.send(txt.to_string()).await;
99                            self.echo.store(false, Ordering::SeqCst);
100                        }
101                    }
102                }
103            }
104        }
105    }
106
107    fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
108        Box::new(move |msg: &Message| msg.is_text())
109    }
110}
111
112#[derive(Clone)]
113pub struct EchoHandle {
114    sender: AsyncSender<String>,
115    receiver: AsyncReceiver<String>,
116}
117
118impl EchoHandle {
119    pub async fn echo(&self, msg: String) -> CoreResult<String> {
120        let _ = self.sender.send(msg).await;
121        println!("In side echo handle, waiting for response...");
122        Ok(self.receiver.recv().await?)
123    }
124}
125
126// --- ApiModule 2: StreamModule ---
127pub struct StreamModule {
128    msg_rx: AsyncReceiver<Arc<Message>>,
129    cmd_rx: AsyncReceiver<bool>,
130    cmd_tx: AsyncSender<String>,
131    send: AtomicBool,
132}
133
134#[async_trait]
135impl ApiModule<()> for StreamModule {
136    type Command = bool;
137    type CommandResponse = String;
138    type Handle = StreamHandle;
139
140    fn new(
141        _state: Arc<()>,
142        cmd_rx: AsyncReceiver<Self::Command>,
143        cmd_ret_tx: AsyncSender<Self::CommandResponse>,
144        msg_rx: AsyncReceiver<Arc<Message>>,
145        _to_ws: AsyncSender<Message>,
146        _: AsyncSender<RunnerCommand>,
147    ) -> Self {
148        Self {
149            msg_rx,
150            cmd_tx: cmd_ret_tx,
151            cmd_rx,
152            send: AtomicBool::new(false),
153        }
154    }
155
156    fn create_handle(
157        sender: AsyncSender<Self::Command>,
158        receiver: AsyncReceiver<Self::CommandResponse>,
159    ) -> Self::Handle {
160        StreamHandle { sender, receiver }
161    }
162
163    async fn run(&mut self) -> CoreResult<()> {
164        loop {
165            tokio::select! {
166                Ok(cmd) = self.cmd_rx.recv() => {
167                    // Update the send flag based on the received command
168                    self.send.store(cmd, Ordering::SeqCst);
169                }
170                Ok(msg) = self.msg_rx.recv() => {
171                    if let Message::Text(txt) = &*msg {
172                        if self.send.load(Ordering::SeqCst) {
173                            // Process the message if send is true
174                            println!("[StreamModule] Received: {txt}");
175                            let _ = self.cmd_tx.send(txt.to_string()).await;
176                        }
177                    }
178                }
179                else => {
180                    println!("[Error] StreamModule: Channel closed");
181                },
182            }
183        }
184    }
185
186    fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
187        Box::new(move |_msg: &Message| {
188            // Accept all messages
189            true
190        })
191    }
192}
193
194#[derive(Clone)]
195pub struct StreamHandle {
196    receiver: AsyncReceiver<String>,
197    sender: AsyncSender<bool>,
198}
199
200impl StreamHandle {
201    pub async fn stream(self) -> CoreResult<impl Stream<Item = CoreResult<String>>> {
202        self.sender.send(true).await?;
203        println!("StreamHandle: Waiting for messages...");
204        Ok(Box::pin(unfold(self.receiver, |state| async move {
205            let item = state.recv().await.map_err(CoreError::from);
206            Some((item, state))
207        })))
208    }
209}
210
211// --- ApiModule 3: PeriodicSenderModule ---
212pub struct PeriodicSenderModule {
213    cmd_rx: AsyncReceiver<bool>,
214    to_ws: AsyncSender<Message>,
215    running: AtomicBool,
216}
217
218#[async_trait]
219impl ApiModule<()> for PeriodicSenderModule {
220    type Command = bool; // true = start, false = stop
221    type CommandResponse = ();
222    type Handle = PeriodicSenderHandle;
223
224    fn new(
225        _state: Arc<()>,
226        cmd_rx: AsyncReceiver<Self::Command>,
227        _cmd_ret_tx: AsyncSender<Self::CommandResponse>,
228        _msg_rx: AsyncReceiver<Arc<Message>>,
229        to_ws: AsyncSender<Message>,
230        _: AsyncSender<RunnerCommand>,
231    ) -> Self {
232        Self {
233            cmd_rx,
234            to_ws,
235            running: AtomicBool::new(false),
236        }
237    }
238
239    fn create_handle(
240        sender: AsyncSender<Self::Command>,
241        _receiver: AsyncReceiver<Self::CommandResponse>,
242    ) -> Self::Handle {
243        PeriodicSenderHandle { sender }
244    }
245
246    async fn run(&mut self) -> CoreResult<()> {
247        let to_ws = self.to_ws.clone();
248        let mut interval = tokio::time::interval(Duration::from_secs(5));
249        loop {
250            tokio::select! {
251                Ok(cmd) = self.cmd_rx.recv() => {
252                    self.running.store(cmd, Ordering::SeqCst);
253                }
254                _ = interval.tick() => {
255                    if self.running.load(Ordering::SeqCst) {
256                        let _ = to_ws.send(Message::text("Ping from periodic sender")).await;
257                    }
258                }
259            }
260        }
261    }
More examples
Hide additional examples
examples/testing_echo_client.rs (line 82)
78    async fn run(&mut self) -> CoreResult<()> {
79        loop {
80            tokio::select! {
81                Ok(cmd) = self.cmd_rx.recv() => {
82                    let _ = self.to_ws.send(Message::text(cmd)).await;
83                    self.echo.store(true, Ordering::SeqCst);
84                }
85                Ok(msg) = self.msg_rx.recv() => {
86                    if let Message::Text(txt) = &*msg {
87                        if self.echo.load(Ordering::SeqCst) {
88                            let _ = self.cmd_tx.send(txt.to_string()).await;
89                            self.echo.store(false, Ordering::SeqCst);
90                        }
91                    }
92                }
93            }
94        }
95    }
Source

pub fn binary<B>(bin: B) -> Message
where B: Into<Bytes>,

Create a new binary WebSocket message by converting to Bytes.

Source

pub fn is_text(&self) -> bool

Indicates whether a message is a text message.

Examples found in repository?
examples/echo_client.rs (line 108)
107    fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
108        Box::new(move |msg: &Message| msg.is_text())
109    }
More examples
Hide additional examples
examples/testing_echo_client.rs (line 100)
97    fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
98        Box::new(move |msg: &Message| {
99            println!("Routing rule for EchoModule: {msg:?}");
100            msg.is_text()
101        })
102    }
Source

pub fn is_binary(&self) -> bool

Indicates whether a message is a binary message.

Source

pub fn is_ping(&self) -> bool

Indicates whether a message is a ping message.

Source

pub fn is_pong(&self) -> bool

Indicates whether a message is a pong message.

Source

pub fn is_close(&self) -> bool

Indicates whether a message is a close message.

Source

pub fn len(&self) -> usize

Get the length of the WebSocket message.

Source

pub fn is_empty(&self) -> bool

Returns true if the WebSocket message has no content. For example, if the other side of the connection sent an empty string.

Source

pub fn into_data(self) -> Bytes

Consume the WebSocket and return it as binary data.

Source

pub fn into_text(self) -> Result<Utf8Bytes, Error>

Attempt to consume the WebSocket message and convert it to a String.

Source

pub fn to_text(&self) -> Result<&str, Error>

Attempt to get a &str from the WebSocket message, this will try to convert binary data to utf8.

Trait Implementations§

Source§

impl Clone for Message

Source§

fn clone(&self) -> Message

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for Message

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more
Source§

impl Display for Message

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more
Source§

impl<'b> From<&'b [u8]> for Message

Source§

fn from(data: &'b [u8]) -> Message

Converts to this type from the input type.
Source§

impl<'s> From<&'s str> for Message

Source§

fn from(string: &'s str) -> Message

Converts to this type from the input type.
Source§

impl From<Bytes> for Message

Source§

fn from(data: Bytes) -> Message

Converts to this type from the input type.
Source§

impl From<Message> for Bytes

Source§

fn from(message: Message) -> Bytes

Converts to this type from the input type.
Source§

impl From<String> for Message

Source§

fn from(string: String) -> Message

Converts to this type from the input type.
Source§

impl From<Vec<u8>> for Message

Source§

fn from(data: Vec<u8>) -> Message

Converts to this type from the input type.
Source§

impl PartialEq for Message

Source§

fn eq(&self, other: &Message) -> bool

Tests for self and other values to be equal, and is used by ==.
1.0.0 · Source§

fn ne(&self, other: &Rhs) -> bool

Tests for !=. The default implementation is almost always sufficient, and should not be overridden without very good reason.
Source§

impl<T> Sink<Message> for WebSocketStream<T>
where T: AsyncRead + AsyncWrite + Unpin,

Source§

type Error = Error

The type of value produced by the sink when an error occurs.
Source§

fn poll_ready( self: Pin<&mut WebSocketStream<T>>, cx: &mut Context<'_>, ) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>

Attempts to prepare the Sink to receive a value. Read more
Source§

fn start_send( self: Pin<&mut WebSocketStream<T>>, item: Message, ) -> Result<(), <WebSocketStream<T> as Sink<Message>>::Error>

Begin the process of sending a value to the sink. Each call to this function must be preceded by a successful call to poll_ready which returned Poll::Ready(Ok(())). Read more
Source§

fn poll_flush( self: Pin<&mut WebSocketStream<T>>, cx: &mut Context<'_>, ) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>

Flush any remaining output from this sink. Read more
Source§

fn poll_close( self: Pin<&mut WebSocketStream<T>>, cx: &mut Context<'_>, ) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>

Flush any remaining output and close this sink, if necessary. Read more
Source§

impl Eq for Message

Source§

impl StructuralPartialEq for Message

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T> ToString for T
where T: Display + ?Sized,

Source§

fn to_string(&self) -> String

Converts the given value to a String. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more