1use fs4::FileExt;
20use std::marker::PhantomData;
21use thiserror::Error;
22
23pub use serde;
26pub use serde_json;
27
28const ENTRIES_DIRECTORY: &str = "entries";
29const TMP_DIRECTORY: &str = "tmp";
30
31#[derive(Debug, Error)]
32pub enum Error {
33 #[error("The given directory already exists")]
34 DirectoryAlreadyExists,
35 #[error("The given directory is not a valid queue directory")]
36 NotAQueueDirectory,
37 #[error("Failed to JSON serialize/deserialize: {0:?}")]
38 JSONSerialize(#[from] serde_json::Error),
39 #[error("I/O error: {0:?}")]
40 IO(#[from] std::io::Error),
41 #[error("The data in the queue has an incompatible data format: {0}")]
42 IncompatibleQueueDataVersion(u32),
43}
44
45type Result<T, E = Error> = std::result::Result<T, E>;
46
47pub struct Queue<T> {
60 path: std::path::PathBuf,
61 _phantom_data: PhantomData<T>,
62}
63
64const fn current_version() -> u32 {
65 1
66}
67
68#[derive(serde::Serialize, serde::Deserialize)]
73struct Payload<T> {
74 version: u32,
75 data: T,
76}
77
78impl<T> Payload<T> {
79 #[inline(always)]
80 const fn new(payload: T) -> Self {
81 Self {
82 version: current_version(),
83 data: payload,
84 }
85 }
86
87 #[inline]
88 fn get(self) -> T {
89 self.data
90 }
91}
92
93impl<T> Queue<T>
94where
95 T: serde::de::DeserializeOwned + serde::Serialize,
96{
97 pub fn open_or_create(path: std::path::PathBuf) -> Result<Self> {
100 if let Ok(s) = Self::open(path.clone()) {
101 return Ok(s);
102 }
103
104 Self::new(path)
105 }
106
107 pub fn new(path: std::path::PathBuf) -> Result<Self> {
110 if path.exists() {
111 return Err(Error::DirectoryAlreadyExists);
112 }
113
114 std::fs::create_dir(&path)?;
115 std::fs::create_dir(path.join(TMP_DIRECTORY))?;
116 std::fs::create_dir(path.join(ENTRIES_DIRECTORY))?;
117
118 Ok(Self {
119 path,
120 _phantom_data: Default::default(),
121 })
122 }
123
124 pub fn open(path: std::path::PathBuf) -> Result<Self> {
128 if !path.is_dir() {
129 return Err(Error::NotAQueueDirectory);
130 }
131
132 if !path.join(ENTRIES_DIRECTORY).is_dir() {
133 return Err(Error::NotAQueueDirectory);
134 }
135
136 if !path.join(TMP_DIRECTORY).is_dir() {
137 return Err(Error::NotAQueueDirectory);
138 }
139
140 Ok(Self {
141 path,
142 _phantom_data: Default::default(),
143 })
144 }
145
146 pub fn enqueue(&self, payload: T) -> Result<u64> {
149 loop {
157 let uid = rand::random::<u64>();
158 let uids = uid.to_string();
159 let path = self.path.join(TMP_DIRECTORY).join(&uids);
160 if path.exists() {
161 continue;
162 }
163 let mut fh = match std::fs::File::create(&path) {
164 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
165 continue;
166 }
167 Ok(o) => o,
168 Err(e) => return Err(e.into()),
169 };
170 let payload = Payload::new(payload);
171 serde_json::to_writer(&mut fh, &payload)?;
172
173 std::fs::rename(path, self.path.join(ENTRIES_DIRECTORY).join(uids))?;
174
175 return Ok(uid);
176 }
177 }
178
179 pub fn dequeue(&self) -> Result<Option<T>> {
185 let mut counter = 0;
186 loop {
187 if counter >= 5 {
188 return Ok(None);
190 }
191 counter += 1;
192 return match self.dequeue_try_once() {
193 Err(Error::IO(e)) if e.kind() == std::io::ErrorKind::NotFound => {
194 continue;
195 }
196 Err(Error::IO(e)) if e.kind() == fs4::lock_contended_error().kind() => {
197 continue;
198 }
199 Err(e) => return Err(e),
200 Ok(r) => Ok(r),
201 };
202 }
203 }
204
205 pub fn dequeue_try_once(&self) -> Result<Option<T>> {
211 let p = self.path.join(ENTRIES_DIRECTORY);
212 let d = match std::fs::read_dir(&p)?.next() {
218 Some(p) => p?.path(),
219 None => return Ok(None),
220 };
221
222 let mut fh = std::fs::File::open(&d)?;
223
224 if let Err(e) = fh.try_lock_exclusive() {
231 return Err(e.into());
232 }
233 let data: Payload<T> = serde_json::from_reader(&mut fh)?;
234
235 if data.version != current_version() {
236 return Err(Error::IncompatibleQueueDataVersion(data.version));
237 }
238
239 std::fs::remove_file(&d)?;
240
241 Ok(Some(data.get()))
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248
249 #[test]
250 fn queue_dequeue_try_once_new_path() {
251 let dir = tempfile::tempdir().unwrap();
252 let queue: Queue<u32> = Queue::new(dir.path().join("dir").to_path_buf()).unwrap();
253 let id1 = queue.enqueue(32).unwrap();
254 let id2 = queue.enqueue(128).unwrap();
255 assert_ne!(id1, id2);
256
257 let e1 = queue.dequeue_try_once().unwrap();
258 let e2 = queue.dequeue_try_once().unwrap();
259 assert!([e1, e2].contains(&Some(32)));
260 assert!([e1, e2].contains(&Some(128)));
261 let e3 = queue.dequeue_try_once().unwrap();
262 assert_eq!(e3, None);
263 }
264
265 #[test]
266 fn queue_dequeue_new_path() {
267 let dir = tempfile::tempdir().unwrap();
268 let queue: Queue<String> = Queue::new(dir.path().join("dir").to_path_buf()).unwrap();
269
270 assert_eq!(queue.dequeue().ok(), Some(None));
271
272 queue.enqueue("foo".to_owned()).unwrap();
273 queue.enqueue("bar".to_owned()).unwrap();
274
275 let e1 = queue.dequeue().unwrap();
276 let e2 = queue.dequeue().unwrap();
277
278 assert!([&e1, &e2].contains(&&Some("foo".to_owned())));
279 assert!([&e1, &e2].contains(&&Some("bar".to_owned())));
280
281 assert_eq!(queue.dequeue().ok(), Some(None));
282 }
283
284 #[test]
285 fn queue_dequeue_already_locked() {
286 let dir = tempfile::tempdir().unwrap();
287 let queue: Queue<String> = Queue::new(dir.path().join("dir").to_path_buf()).unwrap();
288
289 assert_eq!(queue.dequeue().ok(), Some(None));
290
291 let id = queue.enqueue("foo".to_owned()).unwrap();
292
293 let p = dir
294 .path()
295 .join("dir")
296 .join(ENTRIES_DIRECTORY)
297 .join(&format!("{}", id));
298 let fh = std::fs::File::open(p).unwrap();
299 fh.lock_exclusive().unwrap();
300
301 let e1 = queue.dequeue().unwrap();
302 assert_eq!(e1, None);
303 drop(fh);
304 let e1 = queue.dequeue().unwrap();
305 assert_eq!(e1, Some("foo".to_string()));
306 }
307}