Message

Enum Message 

Source
pub enum Message {
    Text(Utf8Bytes),
    Binary(Bytes),
    Ping(Bytes),
    Pong(Bytes),
    Close(Option<CloseFrame>),
    Frame(Frame),
}
Expand description

An enum representing the various forms of a WebSocket message.

Variants§

§

Text(Utf8Bytes)

A text WebSocket message

§

Binary(Bytes)

A binary WebSocket message

§

Ping(Bytes)

A ping message with the specified payload

The payload here must have a length less than 125 bytes

§

Pong(Bytes)

A pong message with the specified payload

The payload here must have a length less than 125 bytes

§

Close(Option<CloseFrame>)

A close message with the optional close frame.

§

Frame(Frame)

Raw frame. Note, that you’re not going to get this value while reading the message.

Implementations§

Source§

impl Message

Source

pub fn text<S>(string: S) -> Message
where S: Into<Utf8Bytes>,

Create a new text WebSocket message from a stringable.

Examples found in repository?
examples/echo_client.rs (line 91)
87    async fn run(&mut self) -> CoreResult<()> {
88        loop {
89            tokio::select! {
90                Ok(cmd) = self.cmd_rx.recv() => {
91                    let _ = self.to_ws.send(Message::text(cmd)).await;
92                    self.echo.store(true, Ordering::SeqCst);
93                }
94                Ok(msg) = self.msg_rx.recv() => {
95                    if let Message::Text(txt) = &*msg && self.echo.load(Ordering::SeqCst) {
96                        let _ = self.cmd_tx.send(txt.to_string()).await;
97                        self.echo.store(false, Ordering::SeqCst);
98                    }
99                }
100            }
101        }
102    }
103
104    fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
105        Box::new(move |msg: &Message| msg.is_text())
106    }
107}
108
109#[derive(Clone)]
110pub struct EchoHandle {
111    sender: AsyncSender<String>,
112    receiver: AsyncReceiver<String>,
113}
114
115impl EchoHandle {
116    pub async fn echo(&self, msg: String) -> CoreResult<String> {
117        let _ = self.sender.send(msg).await;
118        println!("In side echo handle, waiting for response...");
119        Ok(self.receiver.recv().await?)
120    }
121}
122
123// --- ApiModule 2: StreamModule ---
124pub struct StreamModule {
125    msg_rx: AsyncReceiver<Arc<Message>>,
126    cmd_rx: AsyncReceiver<bool>,
127    cmd_tx: AsyncSender<String>,
128    send: AtomicBool,
129}
130
131#[async_trait]
132impl ApiModule<()> for StreamModule {
133    type Command = bool;
134    type CommandResponse = String;
135    type Handle = StreamHandle;
136
137    fn new(
138        _state: Arc<()>,
139        cmd_rx: AsyncReceiver<Self::Command>,
140        cmd_ret_tx: AsyncSender<Self::CommandResponse>,
141        msg_rx: AsyncReceiver<Arc<Message>>,
142        _to_ws: AsyncSender<Message>,
143    ) -> Self {
144        Self {
145            msg_rx,
146            cmd_tx: cmd_ret_tx,
147            cmd_rx,
148            send: AtomicBool::new(false),
149        }
150    }
151
152    fn create_handle(
153        sender: AsyncSender<Self::Command>,
154        receiver: AsyncReceiver<Self::CommandResponse>,
155    ) -> Self::Handle {
156        StreamHandle { sender, receiver }
157    }
158
159    async fn run(&mut self) -> CoreResult<()> {
160        loop {
161            tokio::select! {
162                Ok(cmd) = self.cmd_rx.recv() => {
163                    // Update the send flag based on the received command
164                    self.send.store(cmd, Ordering::SeqCst);
165                }
166                Ok(msg) = self.msg_rx.recv() => {
167                    if let Message::Text(txt) = &*msg
168                        && self.send.load(Ordering::SeqCst) {
169                            // Process the message if send is true
170                            println!("[StreamModule] Received: {txt}");
171                            let _ = self.cmd_tx.send(txt.to_string()).await;
172                        }
173                }
174                else => {
175                    println!("[Error] StreamModule: Channel closed");
176                },
177            }
178        }
179    }
180
181    fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
182        Box::new(move |_msg: &Message| {
183            // Accept all messages
184            true
185        })
186    }
187}
188
189#[derive(Clone)]
190pub struct StreamHandle {
191    receiver: AsyncReceiver<String>,
192    sender: AsyncSender<bool>,
193}
194
195impl StreamHandle {
196    pub async fn stream(self) -> CoreResult<impl Stream<Item = CoreResult<String>>> {
197        self.sender.send(true).await?;
198        println!("StreamHandle: Waiting for messages...");
199        Ok(Box::pin(unfold(self.receiver, |state| async move {
200            let item = state.recv().await.map_err(CoreError::from);
201            Some((item, state))
202        })))
203    }
204}
205
206// --- ApiModule 3: PeriodicSenderModule ---
207pub struct PeriodicSenderModule {
208    cmd_rx: AsyncReceiver<bool>,
209    to_ws: AsyncSender<Message>,
210    running: AtomicBool,
211}
212
213#[async_trait]
214impl ApiModule<()> for PeriodicSenderModule {
215    type Command = bool; // true = start, false = stop
216    type CommandResponse = ();
217    type Handle = PeriodicSenderHandle;
218
219    fn new(
220        _state: Arc<()>,
221        cmd_rx: AsyncReceiver<Self::Command>,
222        _cmd_ret_tx: AsyncSender<Self::CommandResponse>,
223        _msg_rx: AsyncReceiver<Arc<Message>>,
224        to_ws: AsyncSender<Message>,
225    ) -> Self {
226        Self {
227            cmd_rx,
228            to_ws,
229            running: AtomicBool::new(false),
230        }
231    }
232
233    fn create_handle(
234        sender: AsyncSender<Self::Command>,
235        _receiver: AsyncReceiver<Self::CommandResponse>,
236    ) -> Self::Handle {
237        PeriodicSenderHandle { sender }
238    }
239
240    async fn run(&mut self) -> CoreResult<()> {
241        let to_ws = self.to_ws.clone();
242        let mut interval = tokio::time::interval(Duration::from_secs(5));
243        loop {
244            tokio::select! {
245                Ok(cmd) = self.cmd_rx.recv() => {
246                    self.running.store(cmd, Ordering::SeqCst);
247                }
248                _ = interval.tick() => {
249                    if self.running.load(Ordering::SeqCst) {
250                        let _ = to_ws.send(Message::text("Ping from periodic sender")).await;
251                    }
252                }
253            }
254        }
255    }
More examples
Hide additional examples
examples/testing_echo_client.rs (line 81)
77    async fn run(&mut self) -> CoreResult<()> {
78        loop {
79            tokio::select! {
80                Ok(cmd) = self.cmd_rx.recv() => {
81                    let _ = self.to_ws.send(Message::text(cmd)).await;
82                    self.echo.store(true, Ordering::SeqCst);
83                }
84                Ok(msg) = self.msg_rx.recv() => {
85                    if let Message::Text(txt) = &*msg && self.echo.load(Ordering::SeqCst) {
86                        let _ = self.cmd_tx.send(txt.to_string()).await;
87                        self.echo.store(false, Ordering::SeqCst);
88                    }
89                }
90            }
91        }
92    }
Source

pub fn binary<B>(bin: B) -> Message
where B: Into<Bytes>,

Create a new binary WebSocket message by converting to Bytes.

Source

pub fn is_text(&self) -> bool

Indicates whether a message is a text message.

Examples found in repository?
examples/echo_client.rs (line 105)
104    fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
105        Box::new(move |msg: &Message| msg.is_text())
106    }
More examples
Hide additional examples
examples/testing_echo_client.rs (line 97)
94    fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
95        Box::new(move |msg: &Message| {
96            println!("Routing rule for EchoModule: {msg:?}");
97            msg.is_text()
98        })
99    }
Source

pub fn is_binary(&self) -> bool

Indicates whether a message is a binary message.

Source

pub fn is_ping(&self) -> bool

Indicates whether a message is a ping message.

Source

pub fn is_pong(&self) -> bool

Indicates whether a message is a pong message.

Source

pub fn is_close(&self) -> bool

Indicates whether a message is a close message.

Source

pub fn len(&self) -> usize

Get the length of the WebSocket message.

Source

pub fn is_empty(&self) -> bool

Returns true if the WebSocket message has no content. For example, if the other side of the connection sent an empty string.

Source

pub fn into_data(self) -> Bytes

Consume the WebSocket and return it as binary data.

Source

pub fn into_text(self) -> Result<Utf8Bytes, Error>

Attempt to consume the WebSocket message and convert it to a String.

Source

pub fn to_text(&self) -> Result<&str, Error>

Attempt to get a &str from the WebSocket message, this will try to convert binary data to utf8.

Trait Implementations§

Source§

impl Clone for Message

Source§

fn clone(&self) -> Message

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for Message

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more
Source§

impl Display for Message

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more
Source§

impl<'b> From<&'b [u8]> for Message

Source§

fn from(data: &'b [u8]) -> Message

Converts to this type from the input type.
Source§

impl<'s> From<&'s str> for Message

Source§

fn from(string: &'s str) -> Message

Converts to this type from the input type.
Source§

impl From<Bytes> for Message

Source§

fn from(data: Bytes) -> Message

Converts to this type from the input type.
Source§

impl From<Message> for Bytes

Source§

fn from(message: Message) -> Bytes

Converts to this type from the input type.
Source§

impl From<String> for Message

Source§

fn from(string: String) -> Message

Converts to this type from the input type.
Source§

impl From<Vec<u8>> for Message

Source§

fn from(data: Vec<u8>) -> Message

Converts to this type from the input type.
Source§

impl PartialEq for Message

Source§

fn eq(&self, other: &Message) -> bool

Tests for self and other values to be equal, and is used by ==.
1.0.0 · Source§

fn ne(&self, other: &Rhs) -> bool

Tests for !=. The default implementation is almost always sufficient, and should not be overridden without very good reason.
Source§

impl<T> Sink<Message> for WebSocketStream<T>
where T: AsyncRead + AsyncWrite + Unpin,

Source§

type Error = Error

The type of value produced by the sink when an error occurs.
Source§

fn poll_ready( self: Pin<&mut WebSocketStream<T>>, cx: &mut Context<'_>, ) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>

Attempts to prepare the Sink to receive a value. Read more
Source§

fn start_send( self: Pin<&mut WebSocketStream<T>>, item: Message, ) -> Result<(), <WebSocketStream<T> as Sink<Message>>::Error>

Begin the process of sending a value to the sink. Each call to this function must be preceded by a successful call to poll_ready which returned Poll::Ready(Ok(())). Read more
Source§

fn poll_flush( self: Pin<&mut WebSocketStream<T>>, cx: &mut Context<'_>, ) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>

Flush any remaining output from this sink. Read more
Source§

fn poll_close( self: Pin<&mut WebSocketStream<T>>, cx: &mut Context<'_>, ) -> Poll<Result<(), <WebSocketStream<T> as Sink<Message>>::Error>>

Flush any remaining output and close this sink, if necessary. Read more
Source§

impl Eq for Message

Source§

impl StructuralPartialEq for Message

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T> ToString for T
where T: Display + ?Sized,

Source§

fn to_string(&self) -> String

Converts the given value to a String. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more