fabric_cache_client/
client.rs1use crate::Error;
2use serde::{Deserialize, Serialize};
3use tokio::{
4 io::{AsyncReadExt, AsyncWriteExt},
5 net::TcpStream,
6};
7
8pub struct FabricClient {
10 stream: TcpStream,
11}
12impl FabricClient {
13 pub async fn connect(addr: &str) -> Result<Self, Error> {
15 let stream = TcpStream::connect(addr).await?;
16 Ok(FabricClient { stream })
17 }
18
19 pub async fn set<T: Serialize>(&mut self, key: &str, value: &T) -> Result<(), Error> {
25 let serialized_data = serde_json::to_string(value).map_err(Error::BadDataStructure)?;
26
27 let command = format!("SET {} {}\n", key, serialized_data);
28 self.stream.write_all(command.as_bytes()).await?;
29 self.stream.flush().await?;
30
31 let mut buffer = vec![0; 512];
32 let n = self.stream.read(&mut buffer).await?;
33
34 let resp = String::from_utf8_lossy(&buffer[..n]);
35 if resp.contains("OK") {
36 Ok(())
37 } else {
38 Err(Error::Unknown(resp.to_string()))
39 }
40 }
41
42 pub async fn get<S: Into<String>, T>(&mut self, key: S) -> Result<T, Error>
48 where
49 T: for<'de> Deserialize<'de>,
50 {
51 let command = format!("GET {}\n", key.into());
52 self.stream.write_all(command.as_bytes()).await?;
53 self.stream.flush().await?;
54
55 let mut buffer = vec![0; 512];
56 let n = self.stream.read(&mut buffer).await?;
57 let response = String::from_utf8_lossy(&buffer[..n]).to_string();
58
59 let value: T = serde_json::from_str(&response)?;
60 Ok(value)
61 }
62
63 pub async fn remove(&mut self, key: &str) -> Result<(), Error> {
66 let command = format!("REMOVE {}\n", key);
67 self.stream.write_all(command.as_bytes()).await?;
68 self.stream.flush().await?;
69
70 let mut buffer = vec![0; 512];
71 let n = self.stream.read(&mut buffer).await?;
72
73 let resp = String::from_utf8_lossy(&buffer[..n]);
74 if resp.contains("OK") {
75 Ok(())
76 } else {
77 Err(Error::Unknown(resp.to_string()))
78 }
79 }
80}
81
82#[cfg(test)]
83mod tests {
84 use super::*;
85 use serde_json::json;
86 use tokio::io::{AsyncReadExt, AsyncWriteExt};
87 use tokio::net::TcpListener;
88
89 async fn mock_server() -> String {
90 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
91 let addr = listener.local_addr().unwrap();
92 tokio::spawn(async move {
93 if let Ok((mut socket, _)) = listener.accept().await {
94 let mut buffer = [0; 512];
95 let n = socket.read(&mut buffer).await.unwrap();
96 let command = String::from_utf8_lossy(&buffer[..n]);
97 let response = if command.starts_with("SET") {
98 "OK\n"
99 } else if command.starts_with("GET") {
100 r#""value""#
101 } else if command.starts_with("REMOVE") {
102 "OK\n"
103 } else {
104 "ERROR\n"
105 };
106 socket.write_all(response.as_bytes()).await.unwrap();
107 }
108 });
109 addr.to_string()
110 }
111
112 #[tokio::test]
113 async fn test_set_command() {
114 let addr = mock_server().await;
115 let mut client = FabricClient::connect(&addr).await.unwrap();
116
117 let key = "test_key";
118 let value = json!({"data": "value"});
119
120 let result = client.set(key, &value).await;
121 assert!(result.is_ok());
122 }
123
124 #[tokio::test]
125 async fn test_get_command() {
126 let addr = mock_server().await;
127 let mut client = FabricClient::connect(&addr).await.unwrap();
128
129 let key = "test_key";
130 let result: Result<String, Error> = client.get(key).await;
131
132 assert!(result.is_ok());
133 let value = result.unwrap();
134 assert_eq!(value, json!("value"));
135 }
136
137 #[tokio::test]
138 async fn test_remove_command() {
139 let addr = mock_server().await;
140 let mut client = FabricClient::connect(&addr).await.unwrap();
141
142 let key = "test_key";
143 let result = client.remove(key).await;
144
145 assert!(result.is_ok());
146 }
147}