ffly_rs/lib.rs
1// TODO: Write tests
2use core::fmt;
3use std::{
4 error::Error,
5 sync::Arc,
6 time::{SystemTime, UNIX_EPOCH},
7};
8
9use tokio::{
10 io::{AsyncReadExt, AsyncWriteExt},
11 net::TcpStream,
12 sync::Mutex,
13};
14
15/// Catch-all error type
16pub type GenericError = Box<dyn Error + Send + Sync + 'static>;
17
18pub type FireflyResult<T> = Result<T, GenericError>;
19pub type OptResult = FireflyResult<()>;
20pub type StringResult = FireflyResult<String>;
21
22pub struct FireflyStream {
23 /// The stream to the Firefly server.
24 tcp_stream: Arc<Mutex<TcpStream>>,
25
26 /// The maximum length of the response. (size for response buffer)
27 max_buffer_size: usize,
28
29 /// The default TTL for new records.
30 /// If this value is not zero, it will be added to the current timestamp.
31 /// So this is the TTL from when the new is executed.
32 pub default_ttl: usize,
33}
34
35#[derive(Debug)]
36pub enum FireflyError {
37 /// The server returned a value which was not in the expected format.
38 UnexpectedResponseError,
39}
40
41impl Error for FireflyError {}
42impl fmt::Display for FireflyError {
43 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
44 write!(f, "{:?}", self)
45 }
46}
47
48/// Get the amount of seconds since the UNIX epoch.
49fn current_epoch() -> usize {
50 SystemTime::now()
51 .duration_since(UNIX_EPOCH)
52 .unwrap()
53 .as_secs() as usize
54}
55
56impl FireflyStream {
57 /// Instantiate a new TCP connection with a Firefly server.
58 /// Fails if the connection cannot be established. The expected buffer
59 /// size is set to 512.
60 ///
61 /// # Arguments
62 ///
63 /// * `address` - The address of the Firefly server. (e.g. "127.0.0.1:46600")
64 pub async fn connect(address: &str) -> FireflyResult<Self> {
65 Self::connect_with_max_buffer(address, 512).await
66 }
67
68 /// Same as `FireflyStream::connect`, but with a custom buffer size.
69 /// The buffer size is the maximum expected response size.
70 ///
71 /// # Arguments
72 ///
73 /// * `address` - The address of the Firefly server. (e.g. "127.0.0.1:46600")
74 /// * `max_buffer_size` - The maximum expected response size.
75 pub async fn connect_with_max_buffer(
76 address: &str,
77 max_buffer_size: usize,
78 ) -> FireflyResult<Self> {
79 let tcp_stream = Arc::new(Mutex::new(TcpStream::connect(address).await?));
80 let client = Self {
81 max_buffer_size,
82 tcp_stream,
83 default_ttl: 0,
84 };
85
86 client.send_ok("QUERY TYPE BITWISE;".as_bytes()).await?;
87
88 Ok(client)
89 }
90
91 /// Send a slice of bytes to the Firefly server.
92 ///
93 /// # Arguments
94 ///
95 /// * `data` - The slice of bytes to send.
96 async fn send_no_check(&self, data: &[u8]) -> StringResult {
97 let mut stream = self.tcp_stream.lock().await;
98 stream.write(data).await?;
99
100 let mut buffer = vec![0; self.max_buffer_size];
101 let response_size = stream.read(&mut buffer).await?;
102
103 // The server will ALWAYS return valid utf8
104 Ok(String::from_utf8(buffer[..response_size].to_vec()).unwrap())
105 }
106
107 /// The same as the `FireflyStream::send_no_check` method, but check the response.
108 ///
109 /// # Arguments
110 ///
111 /// * `data` - The slice of bytes to send.
112 /// * `expected` - A closure predicate that returns true if the response is valid.
113 async fn send(&self, data: &[u8], expected: fn(&str) -> bool) -> StringResult {
114 let response = self.send_no_check(data).await?;
115
116 if expected(&response) {
117 return Ok(response);
118 }
119
120 Err(FireflyError::UnexpectedResponseError.into())
121 }
122
123 /// Same as send, but checks if the response contains "Ok" or doesn't
124 /// contain "Error".
125 ///
126 /// # Arguments
127 ///
128 /// * `data` - The slice of bytes to send.
129 async fn send_ok(&self, data: &[u8]) -> StringResult {
130 self.send(data, |expected| {
131 expected.contains("Ok") || !expected.contains("Error")
132 })
133 .await
134 }
135
136 /// Create a new record with the default TTL.
137 /// The default TTL is 0. (record lasts for ever)
138 ///
139 /// # Arguments
140 ///
141 /// * `key` - Your unique key for the record.
142 /// * `value` - The value of the record.
143 pub async fn new(&self, key: &str, value: &str) -> OptResult {
144 let mut ttl = self.default_ttl;
145
146 if ttl != 0 {
147 ttl += current_epoch();
148 }
149
150 self.new_with_ttl(key, value, ttl).await
151 }
152
153 /// Same as `FireflyStream::new`, but with a custom TTL.
154 /// The TTL is the timestamp since the UNIX epoch.
155 ///
156 /// # Arguments
157 ///
158 /// * `key` - Your unique key for the record.
159 /// * `value` - The value of the record.
160 /// * `ttl` - The timestamp since the UNIX epoch for the data to expire. (0 = never)
161 pub async fn new_with_ttl(&self, key: &str, value: &str, ttl: usize) -> OptResult {
162 let query = format!("0{key}\0{value}\0{ttl}");
163 self.send_ok(query.as_bytes()).await?;
164
165 Ok(())
166 }
167
168 /// Get a record from the Firefly server. If you only need the value or ttl
169 /// use the specific methods for those purposes. As this returns both values.
170 ///
171 /// # Arguments
172 ///
173 /// * `key` - The key of the record.
174 pub async fn get(&self, key: &str) -> FireflyResult<(String, usize)> {
175 let query = format!("1{key}");
176 let data = self
177 .send(query.as_bytes(), |response| response.contains(0 as char))
178 .await?;
179
180 match data.split_once(0 as char) {
181 Some((value, ttl)) => Ok((value.to_string(), ttl.parse().unwrap())),
182 None => Err(FireflyError::UnexpectedResponseError.into()),
183 }
184 }
185
186 /// Same as `FireflyStream::get`, but only returns the value.
187 ///
188 /// # Arguments
189 ///
190 /// * `key` - The key of the record.
191 pub async fn get_value(&self, key: &str) -> StringResult {
192 self.send_ok(format!("2{key}").as_bytes()).await
193 }
194
195 /// Same as `FireflyStream::get`, but only returns the ttl.
196 ///
197 /// # Arguments
198 ///
199 /// * `key` - The key of the record.
200 pub async fn get_ttl(&self, key: &str) -> FireflyResult<usize> {
201 let ttl = self.send_ok(format!("3{key}").as_bytes()).await?;
202 Ok(ttl.parse()?)
203 }
204
205 /// Remove a record from the Firefly server.
206 ///
207 /// # Arguments
208 ///
209 /// * `key` - The key of the record.
210 pub async fn drop(&self, key: &str) -> OptResult {
211 self.send_ok(format!("4{key}").as_bytes()).await?;
212 Ok(())
213 }
214
215 /// Remove ALL records that have a certain value.
216 /// Using this method is generally discouraged. As it is a heavy operation.
217 ///
218 /// # Arguments
219 ///
220 /// * `value` - The valy of ANY record that should be removed.
221 pub async fn drop_values(&self, value: &str) -> OptResult {
222 self.send_ok(format!("5{value}").as_bytes()).await?;
223 Ok(())
224 }
225}