socket_flow/
connection.rs

1use crate::error::Error;
2use crate::message::Message;
3use crate::split::{WSReader, WSWriter};
4use futures::Stream;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8/// WSConnection represents the final connection of a client/server, after all the steps
9/// of establishing a connection have been properly met.
10/// This structure will be delivered to the end-user, which contains the reader, which would be used
11/// as a stream,
12/// for reading all the data that comes in this connection, and the writer, for writing
13/// data into the stream.
14pub struct WSConnection {
15    /// Represents the writer side of the connection, where the end-user can send data over the connection
16    /// with a different set methods
17    writer: WSWriter,
18    /// Implements futures::Stream,
19    /// so the end-user can process all the incoming messages, using .next() method
20    reader: WSReader,
21}
22
23// WSConnection has the reader attribute, which is already a ReceiverStream
24// Although, we don't want this attribute visible to the end-user.
25// Therefore, implementing Stream for this struct is necessary, so end-user could
26// invoke next() and other stream methods directly from a variable that holds this struct.
27impl Stream for WSConnection {
28    type Item = Result<Message, Error>;
29
30    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
31        // We need to get a mutable reference to the inner field
32        let this = self.get_mut();
33
34        // Delegate the polling to `reader`
35        // We need to pin `reader` because its `poll_next` method requires the object to be pinned
36        Pin::new(&mut this.reader).poll_next(cx)
37    }
38}
39
40impl WSConnection {
41    pub fn new(writer: WSWriter, reader: WSReader) -> Self {
42        Self { writer, reader }
43    }
44
45    /// This function will split the connection into the `WSReader`, which is a stream of messages
46    /// and `WSWriter`, for writing data into the socket.
47    /// It's a good option when you need to work with both in separate tasks or functions
48    pub fn split(self) -> (WSReader, WSWriter) {
49        (self.reader, self.writer)
50    }
51
52    /// This function will be used for closing the connection between two instances, mainly it will
53    /// be used by a client,
54    /// to request disconnection with a server.It first sends a close frame
55    /// through the socket, and waits until it receives the confirmation in a channel
56    /// executing it inside a timeout, to avoid a long waiting time
57    pub async fn close_connection(&mut self) -> Result<(), Error> {
58        self.writer.close_connection().await
59    }
60
61    /// Send a general message, which is a good option for echoing messages
62    pub async fn send_message(&mut self, message: Message) -> Result<(), Error> {
63        self.writer.send_message(message).await
64    }
65
66    /// Send generic data, by default it considers OpCode Text
67    pub async fn send(&mut self, data: Vec<u8>) -> Result<(), Error> {
68        self.writer.send(data).await
69    }
70
71    /// Send a message as Binary Opcode
72    pub async fn send_as_binary(&mut self, data: Vec<u8>) -> Result<(), Error> {
73        self.writer.send_as_binary(data).await
74    }
75
76    /// Send a message as a String
77    pub async fn send_as_text(&mut self, data: String) -> Result<(), Error> {
78        self.writer.send_as_text(data).await
79    }
80
81    /// Sends a Ping OpCode to client/server
82    pub async fn send_ping(&mut self) -> Result<(), Error> {
83        self.writer.send_ping().await
84    }
85
86    /// Send data fragmented, where fragment_size should be a value calculated in powers of 2
87    /// The payload would be divided into that size, still considering connection configurations
88    /// like max_frame_size
89    pub async fn send_large_data_fragmented(
90        &mut self,
91        data: Vec<u8>,
92        fragment_size: usize,
93    ) -> Result<(), Error> {
94        self.writer
95            .send_large_data_fragmented(data, fragment_size)
96            .await
97    }
98}