fabric_cache_client/
client.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
use crate::Error;
use serde::{Deserialize, Serialize};
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::TcpStream,
};

/// Client for interacting with your fabric server
pub struct FabricClient {
    stream: TcpStream,
}
impl FabricClient {
    /// Open a connection to your fabric server.
    pub async fn connect(addr: &str) -> tokio::io::Result<Self> {
        let stream = TcpStream::connect(addr).await?;
        Ok(FabricClient { stream })
    }

    /// Perform the SET command on a provided key to
    /// either insert, or update the value of the key.
    ///
    /// NOTE: That any data structure `T` for the value
    /// must implement the `serde::Serialize` trait.
    pub async fn set<T: Serialize>(&mut self, key: &str, value: &T) -> Result<(), Error> {
        let serialized_data = serde_json::to_string(value).map_err(Error::BadDataStructure)?;

        let command = format!("SET {} {}\n", key, serialized_data);
        self.stream.write_all(command.as_bytes()).await?;
        self.stream.flush().await?;

        let mut buffer = vec![0; 512];
        let n = self.stream.read(&mut buffer).await?;

        let resp = String::from_utf8_lossy(&buffer[..n]);
        if resp.contains("OK") {
            Ok(())
        } else {
            Err(Error::Unknown(resp.to_string()))
        }
    }

    /// Perform the GET command on a provided key to
    /// grab the current value of the key.
    ///
    /// NOTE: You must specify your return type and it
    /// needs to implement the `serde::Deserialize` trait.
    pub async fn get<S: Into<String>, T>(&mut self, key: S) -> Result<T, Error>
    where
        T: for<'de> Deserialize<'de>,
    {
        let command = format!("GET {}\n", key.into());
        self.stream.write_all(command.as_bytes()).await?;
        self.stream.flush().await?;

        let mut buffer = vec![0; 512];
        let n = self.stream.read(&mut buffer).await?;
        let response = String::from_utf8_lossy(&buffer[..n]).to_string();

        let value: T = serde_json::from_str(&response)?;
        Ok(value)
    }

    /// Perform the REMOVE command on a provided key to
    /// remove the key/value pair from cache.
    pub async fn remove(&mut self, key: &str) -> Result<(), Error> {
        let command = format!("REMOVE {}\n", key);
        self.stream.write_all(command.as_bytes()).await?;
        self.stream.flush().await?;

        let mut buffer = vec![0; 512];
        let n = self.stream.read(&mut buffer).await?;

        let resp = String::from_utf8_lossy(&buffer[..n]);
        if resp.contains("OK") {
            Ok(())
        } else {
            Err(Error::Unknown(resp.to_string()))
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use serde_json::json;
    use tokio::io::{AsyncReadExt, AsyncWriteExt};
    use tokio::net::TcpListener;

    async fn mock_server() -> String {
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap();
        tokio::spawn(async move {
            if let Ok((mut socket, _)) = listener.accept().await {
                let mut buffer = [0; 512];
                let n = socket.read(&mut buffer).await.unwrap();
                let command = String::from_utf8_lossy(&buffer[..n]);
                let response = if command.starts_with("SET") {
                    "OK\n"
                } else if command.starts_with("GET") {
                    r#""value""#
                } else if command.starts_with("REMOVE") {
                    "OK\n"
                } else {
                    "ERROR\n"
                };
                socket.write_all(response.as_bytes()).await.unwrap();
            }
        });
        addr.to_string()
    }

    #[tokio::test]
    async fn test_set_command() {
        let addr = mock_server().await;
        let mut client = FabricClient::connect(&addr).await.unwrap();

        let key = "test_key";
        let value = json!({"data": "value"});

        let result = client.set(key, &value).await;
        assert!(result.is_ok());
    }

    #[tokio::test]
    async fn test_get_command() {
        let addr = mock_server().await;
        let mut client = FabricClient::connect(&addr).await.unwrap();

        let key = "test_key";
        let result: Result<String, Error> = client.get(key).await;

        assert!(result.is_ok());
        let value = result.unwrap();
        assert_eq!(value, json!("value"));
    }

    #[tokio::test]
    async fn test_remove_command() {
        let addr = mock_server().await;
        let mut client = FabricClient::connect(&addr).await.unwrap();

        let key = "test_key";
        let result = client.remove(key).await;

        assert!(result.is_ok());
    }
}