ffly_rs/
lib.rs

1// TODO: Write tests
2use core::fmt;
3use std::{
4    error::Error,
5    sync::Arc,
6    time::{SystemTime, UNIX_EPOCH},
7};
8
9use tokio::{
10    io::{AsyncReadExt, AsyncWriteExt},
11    net::TcpStream,
12    sync::Mutex,
13};
14
15/// Catch-all error type
16pub type GenericError = Box<dyn Error + Send + Sync + 'static>;
17
18pub type FireflyResult<T> = Result<T, GenericError>;
19pub type OptResult = FireflyResult<()>;
20pub type StringResult = FireflyResult<String>;
21
22pub struct FireflyStream {
23    /// The stream to the Firefly server.
24    tcp_stream: Arc<Mutex<TcpStream>>,
25
26    /// The maximum length of the response. (size for response buffer)
27    max_buffer_size: usize,
28
29    /// The default TTL for new records.
30    /// If this value is not zero, it will be added to the current timestamp.
31    /// So this is the TTL from when the new is executed.
32    pub default_ttl: usize,
33}
34
35#[derive(Debug)]
36pub enum FireflyError {
37    /// The server returned a value which was not in the expected format.
38    UnexpectedResponseError,
39}
40
41impl Error for FireflyError {}
42impl fmt::Display for FireflyError {
43    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
44        write!(f, "{:?}", self)
45    }
46}
47
48/// Get the amount of seconds since the UNIX epoch.
49fn current_epoch() -> usize {
50    SystemTime::now()
51        .duration_since(UNIX_EPOCH)
52        .unwrap()
53        .as_secs() as usize
54}
55
56impl FireflyStream {
57    /// Instantiate a new TCP connection with a Firefly server.
58    /// Fails if the connection cannot be established. The expected buffer
59    /// size is set to 512.
60    ///
61    /// # Arguments
62    ///
63    /// * `address` - The address of the Firefly server. (e.g. "127.0.0.1:46600")
64    pub async fn connect(address: &str) -> FireflyResult<Self> {
65        Self::connect_with_max_buffer(address, 512).await
66    }
67
68    /// Same as `FireflyStream::connect`, but with a custom buffer size.
69    /// The buffer size is the maximum expected response size.
70    ///
71    /// # Arguments
72    ///
73    /// * `address` - The address of the Firefly server. (e.g. "127.0.0.1:46600")
74    /// * `max_buffer_size` - The maximum expected response size.
75    pub async fn connect_with_max_buffer(
76        address: &str,
77        max_buffer_size: usize,
78    ) -> FireflyResult<Self> {
79        let tcp_stream = Arc::new(Mutex::new(TcpStream::connect(address).await?));
80        let client = Self {
81            max_buffer_size,
82            tcp_stream,
83            default_ttl: 0,
84        };
85
86        client.send_ok("QUERY TYPE BITWISE;".as_bytes()).await?;
87
88        Ok(client)
89    }
90
91    /// Send a slice of bytes to the Firefly server.
92    ///
93    /// # Arguments
94    ///
95    /// * `data` - The slice of bytes to send.
96    async fn send_no_check(&self, data: &[u8]) -> StringResult {
97        let mut stream = self.tcp_stream.lock().await;
98        stream.write(data).await?;
99
100        let mut buffer = vec![0; self.max_buffer_size];
101        let response_size = stream.read(&mut buffer).await?;
102
103        // The server will ALWAYS return valid utf8
104        Ok(String::from_utf8(buffer[..response_size].to_vec()).unwrap())
105    }
106
107    /// The same as the `FireflyStream::send_no_check` method, but check the response.
108    ///
109    /// # Arguments
110    ///
111    /// * `data` - The slice of bytes to send.
112    /// * `expected` - A closure predicate that returns true if the response is valid.
113    async fn send(&self, data: &[u8], expected: fn(&str) -> bool) -> StringResult {
114        let response = self.send_no_check(data).await?;
115
116        if expected(&response) {
117            return Ok(response);
118        }
119
120        Err(FireflyError::UnexpectedResponseError.into())
121    }
122
123    /// Same as send, but checks if the response contains "Ok" or doesn't
124    /// contain "Error".
125    ///
126    /// # Arguments
127    ///
128    /// * `data` - The slice of bytes to send.
129    async fn send_ok(&self, data: &[u8]) -> StringResult {
130        self.send(data, |expected| {
131            expected.contains("Ok") || !expected.contains("Error")
132        })
133        .await
134    }
135
136    /// Create a new record with the default TTL.
137    /// The default TTL is 0. (record lasts for ever)
138    ///
139    /// # Arguments
140    ///
141    /// * `key` - Your unique key for the record.
142    /// * `value` - The value of the record.
143    pub async fn new(&self, key: &str, value: &str) -> OptResult {
144        let mut ttl = self.default_ttl;
145
146        if ttl != 0 {
147            ttl += current_epoch();
148        }
149
150        self.new_with_ttl(key, value, ttl).await
151    }
152
153    /// Same as `FireflyStream::new`, but with a custom TTL.
154    /// The TTL is the timestamp since the UNIX epoch.
155    ///
156    /// # Arguments
157    ///
158    /// * `key` - Your unique key for the record.
159    /// * `value` - The value of the record.
160    /// * `ttl` - The timestamp since the UNIX epoch for the data to expire. (0 = never)
161    pub async fn new_with_ttl(&self, key: &str, value: &str, ttl: usize) -> OptResult {
162        let query = format!("0{key}\0{value}\0{ttl}");
163        self.send_ok(query.as_bytes()).await?;
164
165        Ok(())
166    }
167
168    /// Get a record from the Firefly server. If you only need the value or ttl
169    /// use the specific methods for those purposes. As this returns both values.
170    ///
171    /// # Arguments
172    ///
173    /// * `key` - The key of the record.
174    pub async fn get(&self, key: &str) -> FireflyResult<(String, usize)> {
175        let query = format!("1{key}");
176        let data = self
177            .send(query.as_bytes(), |response| response.contains(0 as char))
178            .await?;
179
180        match data.split_once(0 as char) {
181            Some((value, ttl)) => Ok((value.to_string(), ttl.parse().unwrap())),
182            None => Err(FireflyError::UnexpectedResponseError.into()),
183        }
184    }
185
186    /// Same as `FireflyStream::get`, but only returns the value.
187    ///
188    /// # Arguments
189    ///
190    /// * `key` - The key of the record.
191    pub async fn get_value(&self, key: &str) -> StringResult {
192        self.send_ok(format!("2{key}").as_bytes()).await
193    }
194
195    /// Same as `FireflyStream::get`, but only returns the ttl.
196    ///
197    /// # Arguments
198    ///
199    /// * `key` - The key of the record.
200    pub async fn get_ttl(&self, key: &str) -> FireflyResult<usize> {
201        let ttl = self.send_ok(format!("3{key}").as_bytes()).await?;
202        Ok(ttl.parse()?)
203    }
204
205    /// Remove a record from the Firefly server.
206    ///
207    /// # Arguments
208    ///
209    /// * `key` - The key of the record.
210    pub async fn drop(&self, key: &str) -> OptResult {
211        self.send_ok(format!("4{key}").as_bytes()).await?;
212        Ok(())
213    }
214
215    /// Remove ALL records that have a certain value.
216    /// Using this method is generally discouraged. As it is a heavy operation.
217    ///
218    /// # Arguments
219    ///
220    /// * `value` - The valy of ANY record that should be removed.
221    pub async fn drop_values(&self, value: &str) -> OptResult {
222        self.send_ok(format!("5{value}").as_bytes()).await?;
223        Ok(())
224    }
225}