mini_redis/cmd/publish.rs
1use crate::{Connection, Db, Frame, Parse};
2
3use bytes::Bytes;
4
5/// Posts a message to the given channel.
6///
7/// Send a message into a channel without any knowledge of individual consumers.
8/// Consumers may subscribe to channels in order to receive the messages.
9///
10/// Channel names have no relation to the key-value namespace. Publishing on a
11/// channel named "foo" has no relation to setting the "foo" key.
12#[derive(Debug)]
13pub struct Publish {
14 /// Name of the channel on which the message should be published.
15 channel: String,
16
17 /// The message to publish.
18 message: Bytes,
19}
20
21impl Publish {
22 /// Create a new `Publish` command which sends `message` on `channel`.
23 pub(crate) fn new(channel: impl ToString, message: Bytes) -> Publish {
24 Publish {
25 channel: channel.to_string(),
26 message,
27 }
28 }
29
30 /// Parse a `Publish` instance from a received frame.
31 ///
32 /// The `Parse` argument provides a cursor-like API to read fields from the
33 /// `Frame`. At this point, the entire frame has already been received from
34 /// the socket.
35 ///
36 /// The `PUBLISH` string has already been consumed.
37 ///
38 /// # Returns
39 ///
40 /// On success, the `Publish` value is returned. If the frame is malformed,
41 /// `Err` is returned.
42 ///
43 /// # Format
44 ///
45 /// Expects an array frame containing three entries.
46 ///
47 /// ```text
48 /// PUBLISH channel message
49 /// ```
50 pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result<Publish> {
51 // The `PUBLISH` string has already been consumed. Extract the `channel`
52 // and `message` values from the frame.
53 //
54 // The `channel` must be a valid string.
55 let channel = parse.next_string()?;
56
57 // The `message` is arbitrary bytes.
58 let message = parse.next_bytes()?;
59
60 Ok(Publish { channel, message })
61 }
62
63 /// Apply the `Publish` command to the specified `Db` instance.
64 ///
65 /// The response is written to `dst`. This is called by the server in order
66 /// to execute a received command.
67 pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> {
68 // The shared state contains the `tokio::sync::broadcast::Sender` for
69 // all active channels. Calling `db.publish` dispatches the message into
70 // the appropriate channel.
71 //
72 // The number of subscribers currently listening on the channel is
73 // returned. This does not mean that `num_subscriber` channels will
74 // receive the message. Subscribers may drop before receiving the
75 // message. Given this, `num_subscribers` should only be used as a
76 // "hint".
77 let num_subscribers = db.publish(&self.channel, self.message);
78
79 // The number of subscribers is returned as the response to the publish
80 // request.
81 let response = Frame::Integer(num_subscribers as u64);
82
83 // Write the frame to the client.
84 dst.write_frame(&response).await?;
85
86 Ok(())
87 }
88
89 /// Converts the command into an equivalent `Frame`.
90 ///
91 /// This is called by the client when encoding a `Publish` command to send
92 /// to the server.
93 pub(crate) fn into_frame(self) -> Frame {
94 let mut frame = Frame::array();
95 frame.push_bulk(Bytes::from("publish".as_bytes()));
96 frame.push_bulk(Bytes::from(self.channel.into_bytes()));
97 frame.push_bulk(self.message);
98
99 frame
100 }
101}