mini_redis/cmd/
publish.rs

1use crate::{Connection, Db, Frame, Parse};
2
3use bytes::Bytes;
4
5/// Posts a message to the given channel.
6///
7/// Send a message into a channel without any knowledge of individual consumers.
8/// Consumers may subscribe to channels in order to receive the messages.
9///
10/// Channel names have no relation to the key-value namespace. Publishing on a
11/// channel named "foo" has no relation to setting the "foo" key.
12#[derive(Debug)]
13pub struct Publish {
14    /// Name of the channel on which the message should be published.
15    channel: String,
16
17    /// The message to publish.
18    message: Bytes,
19}
20
21impl Publish {
22    /// Create a new `Publish` command which sends `message` on `channel`.
23    pub(crate) fn new(channel: impl ToString, message: Bytes) -> Publish {
24        Publish {
25            channel: channel.to_string(),
26            message,
27        }
28    }
29
30    /// Parse a `Publish` instance from a received frame.
31    ///
32    /// The `Parse` argument provides a cursor-like API to read fields from the
33    /// `Frame`. At this point, the entire frame has already been received from
34    /// the socket.
35    ///
36    /// The `PUBLISH` string has already been consumed.
37    ///
38    /// # Returns
39    ///
40    /// On success, the `Publish` value is returned. If the frame is malformed,
41    /// `Err` is returned.
42    ///
43    /// # Format
44    ///
45    /// Expects an array frame containing three entries.
46    ///
47    /// ```text
48    /// PUBLISH channel message
49    /// ```
50    pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result<Publish> {
51        // The `PUBLISH` string has already been consumed. Extract the `channel`
52        // and `message` values from the frame.
53        //
54        // The `channel` must be a valid string.
55        let channel = parse.next_string()?;
56
57        // The `message` is arbitrary bytes.
58        let message = parse.next_bytes()?;
59
60        Ok(Publish { channel, message })
61    }
62
63    /// Apply the `Publish` command to the specified `Db` instance.
64    ///
65    /// The response is written to `dst`. This is called by the server in order
66    /// to execute a received command.
67    pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> {
68        // The shared state contains the `tokio::sync::broadcast::Sender` for
69        // all active channels. Calling `db.publish` dispatches the message into
70        // the appropriate channel.
71        //
72        // The number of subscribers currently listening on the channel is
73        // returned. This does not mean that `num_subscriber` channels will
74        // receive the message. Subscribers may drop before receiving the
75        // message. Given this, `num_subscribers` should only be used as a
76        // "hint".
77        let num_subscribers = db.publish(&self.channel, self.message);
78
79        // The number of subscribers is returned as the response to the publish
80        // request.
81        let response = Frame::Integer(num_subscribers as u64);
82
83        // Write the frame to the client.
84        dst.write_frame(&response).await?;
85
86        Ok(())
87    }
88
89    /// Converts the command into an equivalent `Frame`.
90    ///
91    /// This is called by the client when encoding a `Publish` command to send
92    /// to the server.
93    pub(crate) fn into_frame(self) -> Frame {
94        let mut frame = Frame::array();
95        frame.push_bulk(Bytes::from("publish".as_bytes()));
96        frame.push_bulk(Bytes::from(self.channel.into_bytes()));
97        frame.push_bulk(self.message);
98
99        frame
100    }
101}