kade_proto/clients/
buffered_client.rs

1use crate::clients::Client;
2use crate::Result;
3
4use bytes::Bytes;
5use tokio::sync::mpsc::{channel, Receiver, Sender};
6use tokio::sync::oneshot;
7
8#[derive(Debug)]
9enum Command {
10    Get(String),
11    Set(String, Bytes),
12}
13
14type Message = (Command, oneshot::Sender<Result<Option<Bytes>>>);
15
16async fn run(mut client: Client, mut rx: Receiver<Message>) {
17    while let Some((cmd, tx)) = rx.recv().await {
18        let response = match cmd {
19            Command::Get(key) => client.get(&key).await,
20            Command::Set(key, value) => client.set(&key, value).await.map(|_| None),
21        };
22
23        let _ = tx.send(response);
24    }
25}
26
27#[derive(Clone)]
28pub struct BufferedClient {
29    tx: Sender<Message>,
30}
31
32impl BufferedClient {
33    pub fn buffer(client: Client) -> BufferedClient {
34        let (tx, rx) = channel(32);
35
36        tokio::spawn(async move { run(client, rx).await });
37
38        BufferedClient { tx }
39    }
40
41    pub async fn get(&mut self, key: &str) -> Result<Option<Bytes>> {
42        let get = Command::Get(key.into());
43        let (tx, rx) = oneshot::channel();
44
45        self.tx.send((get, tx)).await?;
46
47        // Awaitthe response
48        match rx.await {
49            Ok(res) => res,
50            Err(err) => Err(err.into()),
51        }
52    }
53
54    pub async fn set(&mut self, key: &str, value: Bytes) -> Result<()> {
55        let set = Command::Set(key.into(), value);
56        let (tx, rx) = oneshot::channel();
57
58        self.tx.send((set, tx)).await?;
59
60        match rx.await {
61            Ok(res) => res.map(|_| ()),
62            Err(err) => Err(err.into()),
63        }
64    }
65}