mini_redis/buffer.rs
1use crate::client::Client;
2use crate::Result;
3
4use bytes::Bytes;
5use tokio::sync::mpsc::{channel, Receiver, Sender};
6use tokio::sync::oneshot;
7
8/// Create a new client request buffer
9///
10/// The `Client` performs Redis commands directly on the TCP connection. Only a
11/// single request may be in-flight at a given time and operations require
12/// mutable access to the `Client` handle. This prevents using a single Redis
13/// connection from multiple Tokio tasks.
14///
15/// The strategy for dealing with this class of problem is to spawn a dedicated
16/// Tokio task to manage the Redis connection and using "message passing" to
17/// operate on the connection. Commands are pushed into a channel. The
18/// connection task pops commands off of the channel and applies them to the
19/// Redis connection. When the response is received, it is forwarded to the
20/// original requester.
21///
22/// The returned `Buffer` handle may be cloned before passing the new handle to
23/// separate tasks.
24pub fn buffer(client: Client) -> Buffer {
25 // Setting the message limit to a hard coded value of 32. in a real-app, the
26 // buffer size should be configurable, but we don't need to do that here.
27 let (tx, rx) = channel(32);
28
29 // Spawn a task to process requests for the connection.
30 tokio::spawn(async move { run(client, rx).await });
31
32 // Return the `Buffer` handle.
33 Buffer { tx }
34}
35
36// Enum used to message pass the requested command from the `Buffer` handle
37#[derive(Debug)]
38enum Command {
39 Get(String),
40 Set(String, Bytes),
41}
42
43// Message type sent over the channel to the connection task.
44//
45// `Command` is the command to forward to the connection.
46//
47// `oneshot::Sender` is a channel type that sends a **single** value. It is used
48// here to send the response received from the connection back to the original
49// requester.
50type Message = (Command, oneshot::Sender<Result<Option<Bytes>>>);
51
52/// Receive commands sent through the channel and forward them to client. The
53/// response is returned back to the caller via a `oneshot`.
54async fn run(mut client: Client, mut rx: Receiver<Message>) {
55 // Repeatedly pop messages from the channel. A return value of `None`
56 // indicates that all `Buffer` handles have dropped and there will never be
57 // another message sent on the channel.
58 while let Some((cmd, tx)) = rx.recv().await {
59 // The command is forwarded to the connection
60 let response = match cmd {
61 Command::Get(key) => client.get(&key).await,
62 Command::Set(key, value) => client.set(&key, value).await.map(|_| None),
63 };
64
65 // Send the response back to the caller.
66 //
67 // Failing to send the message indicates the `rx` half dropped
68 // before receiving the message. This is a normal runtime event.
69 let _ = tx.send(response);
70 }
71}
72
73#[derive(Clone)]
74pub struct Buffer {
75 tx: Sender<Message>,
76}
77
78impl Buffer {
79 /// Get the value of a key.
80 ///
81 /// Same as `Client::get` but requests are **buffered** until the associated
82 /// connection has the ability to send the request.
83 pub async fn get(&mut self, key: &str) -> Result<Option<Bytes>> {
84 // Initialize a new `Get` command to send via the channel.
85 let get = Command::Get(key.into());
86
87 // Initialize a new oneshot to be used to receive the response back from the connection.
88 let (tx, rx) = oneshot::channel();
89
90 // Send the request
91 self.tx.send((get, tx)).await?;
92
93 // Await the response
94 match rx.await {
95 Ok(res) => res,
96 Err(err) => Err(err.into()),
97 }
98 }
99
100 /// Set `key` to hold the given `value`.
101 ///
102 /// Same as `Client::set` but requests are **buffered** until the associated
103 /// connection has the ability to send the request
104 pub async fn set(&mut self, key: &str, value: Bytes) -> Result<()> {
105 // Initialize a new `Set` command to send via the channel.
106 let set = Command::Set(key.into(), value);
107
108 // Initialize a new oneshot to be used to receive the response back from the connection.
109 let (tx, rx) = oneshot::channel();
110
111 // Send the request
112 self.tx.send((set, tx)).await?;
113
114 // Await the response
115 match rx.await {
116 Ok(res) => res.map(|_| ()),
117 Err(err) => Err(err.into()),
118 }
119 }
120}