mini_redis/
buffer.rs

1use crate::client::Client;
2use crate::Result;
3
4use bytes::Bytes;
5use tokio::sync::mpsc::{channel, Receiver, Sender};
6use tokio::sync::oneshot;
7
8/// Create a new client request buffer
9///
10/// The `Client` performs Redis commands directly on the TCP connection. Only a
11/// single request may be in-flight at a given time and operations require
12/// mutable access to the `Client` handle. This prevents using a single Redis
13/// connection from multiple Tokio tasks.
14///
15/// The strategy for dealing with this class of problem is to spawn a dedicated
16/// Tokio task to manage the Redis connection and using "message passing" to
17/// operate on the connection. Commands are pushed into a channel. The
18/// connection task pops commands off of the channel and applies them to the
19/// Redis connection. When the response is received, it is forwarded to the
20/// original requester.
21///
22/// The returned `Buffer` handle may be cloned before passing the new handle to
23/// separate tasks.
24pub fn buffer(client: Client) -> Buffer {
25    // Setting the message limit to a hard coded value of 32. in a real-app, the
26    // buffer size should be configurable, but we don't need to do that here.
27    let (tx, rx) = channel(32);
28
29    // Spawn a task to process requests for the connection.
30    tokio::spawn(async move { run(client, rx).await });
31
32    // Return the `Buffer` handle.
33    Buffer { tx }
34}
35
36// Enum used to message pass the requested command from the `Buffer` handle
37#[derive(Debug)]
38enum Command {
39    Get(String),
40    Set(String, Bytes),
41}
42
43// Message type sent over the channel to the connection task.
44//
45// `Command` is the command to forward to the connection.
46//
47// `oneshot::Sender` is a channel type that sends a **single** value. It is used
48// here to send the response received from the connection back to the original
49// requester.
50type Message = (Command, oneshot::Sender<Result<Option<Bytes>>>);
51
52/// Receive commands sent through the channel and forward them to client. The
53/// response is returned back to the caller via a `oneshot`.
54async fn run(mut client: Client, mut rx: Receiver<Message>) {
55    // Repeatedly pop messages from the channel. A return value of `None`
56    // indicates that all `Buffer` handles have dropped and there will never be
57    // another message sent on the channel.
58    while let Some((cmd, tx)) = rx.recv().await {
59        // The command is forwarded to the connection
60        let response = match cmd {
61            Command::Get(key) => client.get(&key).await,
62            Command::Set(key, value) => client.set(&key, value).await.map(|_| None),
63        };
64
65        // Send the response back to the caller.
66        //
67        // Failing to send the message indicates the `rx` half dropped
68        // before receiving the message. This is a normal runtime event.
69        let _ = tx.send(response);
70    }
71}
72
73#[derive(Clone)]
74pub struct Buffer {
75    tx: Sender<Message>,
76}
77
78impl Buffer {
79    /// Get the value of a key.
80    ///
81    /// Same as `Client::get` but requests are **buffered** until the associated
82    /// connection has the ability to send the request.
83    pub async fn get(&mut self, key: &str) -> Result<Option<Bytes>> {
84        // Initialize a new `Get` command to send via the channel.
85        let get = Command::Get(key.into());
86
87        // Initialize a new oneshot to be used to receive the response back from the connection.
88        let (tx, rx) = oneshot::channel();
89
90        // Send the request
91        self.tx.send((get, tx)).await?;
92
93        // Await the response
94        match rx.await {
95            Ok(res) => res,
96            Err(err) => Err(err.into()),
97        }
98    }
99
100    /// Set `key` to hold the given `value`.
101    ///
102    /// Same as `Client::set` but requests are **buffered** until the associated
103    /// connection has the ability to send the request
104    pub async fn set(&mut self, key: &str, value: Bytes) -> Result<()> {
105        // Initialize a new `Set` command to send via the channel.
106        let set = Command::Set(key.into(), value);
107
108        // Initialize a new oneshot to be used to receive the response back from the connection.
109        let (tx, rx) = oneshot::channel();
110
111        // Send the request
112        self.tx.send((set, tx)).await?;
113
114        // Await the response
115        match rx.await {
116            Ok(res) => res.map(|_| ()),
117            Err(err) => Err(err.into()),
118        }
119    }
120}