1mod pipe_builder;
2
3use std::ops::Range;
4use pipe_builder::PipeBuilder;
5use regex::Regex;
6use tokio::{
7 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
8 net::TcpStream,
9};
10
11macro_rules! send_command {
12 ($socket:expr, $command:expr) => {{
13 $socket
14 .write_all($command.as_bytes())
15 .await
16 .map_err(|err| format!("Failed to send the command: {err}"))?;
17 }};
18}
19
20#[derive(Debug)]
21pub enum Expression {
22 Number(i32),
23 Range(Range<i32>)
24}
25
26#[derive(Debug)]
27pub enum DeleteExpression<'a> {
28 Number(i32),
29 ID(&'a str),
30 Range(Range<i32>)
31}
32
33#[derive(Debug)]
34pub struct ServerResponse {
35 pub status: String,
36 pub data: String,
37}
38
39#[derive(Debug)]
40pub struct Item {
41 pub id: String,
42 pub data: String
43}
44
45#[derive(Debug)]
46pub struct IrisClient {
47 socket: TcpStream,
48}
49
50impl IrisClient {
51 pub async fn set(self: &mut Self, id: &str, data: &str) -> Result<String, String> {
52 send_command!(self.socket, format!("SET {id} {data}\n"));
53
54 let server_resp = self.server_response().await?;
55 Ok(server_resp.data)
56 }
57
58 pub async fn delete<'a>(self: &mut Self, expr: DeleteExpression<'a>) -> Result<Vec<Item>, String> {
59 match expr {
60 DeleteExpression::Number(count) => send_command!(self.socket, format!("DEL {count}\n")),
61 DeleteExpression::ID(id) => send_command!(self.socket, format!("DEL {id}\n")),
62 DeleteExpression::Range(range) => send_command!(self.socket, format!("DEL {:?}\n", range))
63 }
64
65 let server_resp = self.server_response().await?;
66 let deleted = self.parse_tuple(server_resp.data.as_str())?;
67
68 Ok(deleted)
69 }
70
71 pub async fn get(self: &mut Self, id: &str) -> Result<String, String> {
72 send_command!(self.socket, format!("GET {id}\n"));
73
74 let server_resp = self.server_response().await?;
75 Ok(server_resp.data)
76 }
77
78 pub async fn list(self: &mut Self, expr: Expression) -> Result<Vec<Item>, String> {
79 match expr {
80 Expression::Number(count) => send_command!(self.socket, format!("LST {count}\n")),
81 Expression::Range(range) => send_command!(self.socket, format!("LST {:?}\n", range))
82 }
83
84 let server_resp = self.server_response().await?;
85 let list = self.parse_tuple(server_resp.data.as_str()).unwrap();
86
87 Ok(list)
88 }
89
90 pub async fn count(self: &mut Self, expr: Expression) -> Result<u32, String> {
91 match expr {
92 Expression::Number(count) => send_command!(self.socket, format!("CNT {count}\n")),
93 Expression::Range(range) => send_command!(self.socket, format!("CNT {:?}\n", range))
94 }
95
96 let server_resp = self.server_response().await?;
97 let count = str::parse::<u32>(server_resp.data.as_str()).unwrap();
98
99 Ok(count)
100 }
101
102 pub async fn raw(self: &mut Self, command: &str) -> Result<ServerResponse, String> {
103 send_command!(self.socket, format!("{command}\n"));
104
105 let server_resp = self.server_response().await?;
106 Ok(server_resp)
107 }
108
109 pub fn pipe(self: &mut Self) -> PipeBuilder {
110 PipeBuilder {
111 command: String::new(),
112 client: self
113 }
114 }
115
116 async fn server_response(self: &mut Self) -> Result<ServerResponse, String> {
117 let mut buf_reader = BufReader::new(&mut self.socket);
118 let mut buffer = String::new();
119 let server_resp = match buf_reader.read_line(&mut buffer).await {
120 Ok(0) => return Err("Connection closed".to_string()),
121 Ok(_) => {
122 let response = self.parse_response(buffer.trim().to_string());
123
124 if response.status == "err" {
125 return Err(response.data);
126 }
127
128 response
129 }
130 Err(err) => return Err(format!("Failed to read server response: {err}")),
131 };
132
133 Ok(server_resp)
134 }
135
136 fn parse_response(&self, response: String) -> ServerResponse {
137 let parts: Vec<&str> = response.splitn(2, ' ').collect();
138
139 ServerResponse {
140 status: parts.get(0).unwrap().to_string(),
141 data: parts.get(1).unwrap().to_string(),
142 }
143 }
144
145 fn parse_tuple(&self, response: &str) -> Result<Vec<Item>, String> {
146 let regex = Regex::new(r#"\s*\[\s*(\(".*?",\s*".*?"\)\s*,?\s*)*\]\s*"#).unwrap();
147
148 if !regex.is_match(response) {
149 return Err("Invalid tuple response".to_string());
150 }
151
152 let mut result = Vec::new();
153
154 let pairs = Regex::new(r#"\("(.*?)",\s*"(.*?)"\)"#).unwrap();
155 for cap in pairs.captures_iter(response) {
156 let id = cap.get(1).unwrap().as_str().to_string();
157 let data = cap.get(2).unwrap().as_str().to_string();
158
159 result.push(Item {
160 id,
161 data
162 });
163 }
164
165 Ok(result)
166 }
167}
168
169pub async fn connect(addr: &str) -> Result<IrisClient, String> {
170 let socket = TcpStream::connect(addr)
171 .await
172 .map_err(|err| format!("Failed to connect: {err}"))?;
173
174 Ok(IrisClient { socket })
175}