diskqueue/
lib.rs

1//! (Unordered) queue of items persisted to disk of a given
2//! serde-serializable type.
3//! The on-disk structure is considered an implementation detail and
4//! not a public interface. The data on disk is versioned and an
5//! effort is made to not consume incompatible data schemas across
6//! versions. Items that can't be consumed (e.g. due to incompatible
7//! versions) aren't removed from the queue and might bubble up again.
8//!
9//! Example usage:
10//! ```rust
11//!   # let t = tempfile::tempdir().unwrap();
12//!   # let path = t.path().join("foo").to_path_buf();
13//!   use diskqueue::Queue;
14//!   let q: Queue<i32> = Queue::new(path).unwrap();
15//!   q.enqueue(123).unwrap();
16//!   let v = q.dequeue().unwrap();
17//!   assert_eq!(v, Some(123));
18//! ```
19use fs4::FileExt;
20use std::marker::PhantomData;
21use thiserror::Error;
22
23/// Exports of [serde] and [serde_json] to allow consumers ensuring
24/// that they use a compatible version.
25pub use serde;
26pub use serde_json;
27
28const ENTRIES_DIRECTORY: &str = "entries";
29const TMP_DIRECTORY: &str = "tmp";
30
31#[derive(Debug, Error)]
32pub enum Error {
33    #[error("The given directory already exists")]
34    DirectoryAlreadyExists,
35    #[error("The given directory is not a valid queue directory")]
36    NotAQueueDirectory,
37    #[error("Failed to JSON serialize/deserialize: {0:?}")]
38    JSONSerialize(#[from] serde_json::Error),
39    #[error("I/O error: {0:?}")]
40    IO(#[from] std::io::Error),
41    #[error("The data in the queue has an incompatible data format: {0}")]
42    IncompatibleQueueDataVersion(u32),
43}
44
45type Result<T, E = Error> = std::result::Result<T, E>;
46
47/// Instance of a queue returning elements of type `T`.
48///
49/// Example usage:
50/// ```rust
51///   # let t = tempfile::tempdir().unwrap();
52///   # let path = t.path().join("foo").to_path_buf();
53///   use diskqueue::Queue;
54///   let q: Queue<i32> = Queue::new(path).unwrap();
55///   q.enqueue(123).unwrap();
56///   let v = q.dequeue().unwrap();
57///   assert_eq!(v, Some(123));
58/// ```
59pub struct Queue<T> {
60    path: std::path::PathBuf,
61    _phantom_data: PhantomData<T>,
62}
63
64const fn current_version() -> u32 {
65    1
66}
67
68/// Internal wrapper structure for the data that is being stored on
69/// disk. This allows us to reject data that doesn't match our
70/// expected data version (e.g. multiple consumers of different
71/// versions).
72#[derive(serde::Serialize, serde::Deserialize)]
73struct Payload<T> {
74    version: u32,
75    data: T,
76}
77
78impl<T> Payload<T> {
79    #[inline(always)]
80    const fn new(payload: T) -> Self {
81        Self {
82            version: current_version(),
83            data: payload,
84        }
85    }
86
87    #[inline]
88    fn get(self) -> T {
89        self.data
90    }
91}
92
93impl<T> Queue<T>
94where
95    T: serde::de::DeserializeOwned + serde::Serialize,
96{
97    /// Try opening a queue at the given location or create a new
98    /// queue directory.
99    pub fn open_or_create(path: std::path::PathBuf) -> Result<Self> {
100        if let Ok(s) = Self::open(path.clone()) {
101            return Ok(s);
102        }
103
104        Self::new(path)
105    }
106
107    /// Create a new queue directory at the given path. The path must
108    /// not exist.
109    pub fn new(path: std::path::PathBuf) -> Result<Self> {
110        if path.exists() {
111            return Err(Error::DirectoryAlreadyExists);
112        }
113
114        std::fs::create_dir(&path)?;
115        std::fs::create_dir(path.join(TMP_DIRECTORY))?;
116        std::fs::create_dir(path.join(ENTRIES_DIRECTORY))?;
117
118        Ok(Self {
119            path,
120            _phantom_data: Default::default(),
121        })
122    }
123
124    /// Create a new queue instance for the given path. If the path
125    /// doesn't follow the queue directory structure an error is
126    /// returned.
127    pub fn open(path: std::path::PathBuf) -> Result<Self> {
128        if !path.is_dir() {
129            return Err(Error::NotAQueueDirectory);
130        }
131
132        if !path.join(ENTRIES_DIRECTORY).is_dir() {
133            return Err(Error::NotAQueueDirectory);
134        }
135
136        if !path.join(TMP_DIRECTORY).is_dir() {
137            return Err(Error::NotAQueueDirectory);
138        }
139
140        Ok(Self {
141            path,
142            _phantom_data: Default::default(),
143        })
144    }
145
146    /// Add the given item to the queue.
147    /// Returns the unique id of the item.
148    pub fn enqueue(&self, payload: T) -> Result<u64> {
149        // We try a couple of times until we have a random "unique"
150        // id that doesn't already exist in the temporary directory.
151        // Once that file has been written we try to move/rename it
152        // into the actual entries directory.
153        // FIXME: Is renaming on windows/MacOS atomic? Would locking
154        // the file prevent it being consumed before the rename has
155        // finished?
156        loop {
157            let uid = rand::random::<u64>();
158            let uids = uid.to_string();
159            let path = self.path.join(TMP_DIRECTORY).join(&uids);
160            if path.exists() {
161                continue;
162            }
163            let mut fh = match std::fs::File::create(&path) {
164                Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
165                    continue;
166                }
167                Ok(o) => o,
168                Err(e) => return Err(e.into()),
169            };
170            let payload = Payload::new(payload);
171            serde_json::to_writer(&mut fh, &payload)?;
172
173            std::fs::rename(path, self.path.join(ENTRIES_DIRECTORY).join(uids))?;
174
175            return Ok(uid);
176        }
177    }
178
179    /// Consume a single item from the queue. Returns `Ok(None)` if
180    /// there was no item that could be consumed, otherwise returns
181    /// `Ok(Some(T))`.  Retries up to 5 times on transient errors
182    /// e.g. another process having locked or removed the item
183    /// already. If it fails to read five times Ok(None) is returned.
184    pub fn dequeue(&self) -> Result<Option<T>> {
185        let mut counter = 0;
186        loop {
187            if counter >= 5 {
188                // We failed to read a file more than 5 times, just give up
189                return Ok(None);
190            }
191            counter += 1;
192            return match self.dequeue_try_once() {
193                Err(Error::IO(e)) if e.kind() == std::io::ErrorKind::NotFound => {
194                    continue;
195                }
196                Err(Error::IO(e)) if e.kind() == fs4::lock_contended_error().kind() => {
197                    continue;
198                }
199                Err(e) => return Err(e),
200                Ok(r) => Ok(r),
201            };
202        }
203    }
204
205    /// Tries to dequeue a single item from the queue. Compared to
206    /// [Queue::dequeue] this function doesn't retry and might return
207    /// transient errors.
208    /// Returns `Ok(Some(T))` on success and `Ok(None)` when the queue
209    /// is empty.
210    pub fn dequeue_try_once(&self) -> Result<Option<T>> {
211        let p = self.path.join(ENTRIES_DIRECTORY);
212        // FIXME: read_dir currently reads up to some larger amount of
213        // data from the kernel. Since we are only interested in one
214        // entry we should be using another rust binding or interact
215        // with the OS calls directly (hopefully not).
216        // For now this is "okay" as I don't expect high-volume users.
217        let d = match std::fs::read_dir(&p)?.next() {
218            Some(p) => p?.path(),
219            None => return Ok(None),
220        };
221
222        let mut fh = std::fs::File::open(&d)?;
223
224        // We must lock the file before moving any further. It could
225        // be that another process has already consumed it but not yet
226        // unliked it.
227        // IDEA: Should be unlink before we have finished reading?
228        // Crashing readers would then "eat" data but we could move
229        // them to some "scratch" location?
230        if let Err(e) = fh.try_lock_exclusive() {
231            return Err(e.into());
232        }
233        let data: Payload<T> = serde_json::from_reader(&mut fh)?;
234
235        if data.version != current_version() {
236            return Err(Error::IncompatibleQueueDataVersion(data.version));
237        }
238
239        std::fs::remove_file(&d)?;
240
241        Ok(Some(data.get()))
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248
249    #[test]
250    fn queue_dequeue_try_once_new_path() {
251        let dir = tempfile::tempdir().unwrap();
252        let queue: Queue<u32> = Queue::new(dir.path().join("dir").to_path_buf()).unwrap();
253        let id1 = queue.enqueue(32).unwrap();
254        let id2 = queue.enqueue(128).unwrap();
255        assert_ne!(id1, id2);
256
257        let e1 = queue.dequeue_try_once().unwrap();
258        let e2 = queue.dequeue_try_once().unwrap();
259        assert!([e1, e2].contains(&Some(32)));
260        assert!([e1, e2].contains(&Some(128)));
261        let e3 = queue.dequeue_try_once().unwrap();
262        assert_eq!(e3, None);
263    }
264
265    #[test]
266    fn queue_dequeue_new_path() {
267        let dir = tempfile::tempdir().unwrap();
268        let queue: Queue<String> = Queue::new(dir.path().join("dir").to_path_buf()).unwrap();
269
270        assert_eq!(queue.dequeue().ok(), Some(None));
271
272        queue.enqueue("foo".to_owned()).unwrap();
273        queue.enqueue("bar".to_owned()).unwrap();
274
275        let e1 = queue.dequeue().unwrap();
276        let e2 = queue.dequeue().unwrap();
277
278        assert!([&e1, &e2].contains(&&Some("foo".to_owned())));
279        assert!([&e1, &e2].contains(&&Some("bar".to_owned())));
280
281        assert_eq!(queue.dequeue().ok(), Some(None));
282    }
283
284    #[test]
285    fn queue_dequeue_already_locked() {
286        let dir = tempfile::tempdir().unwrap();
287        let queue: Queue<String> = Queue::new(dir.path().join("dir").to_path_buf()).unwrap();
288
289        assert_eq!(queue.dequeue().ok(), Some(None));
290
291        let id = queue.enqueue("foo".to_owned()).unwrap();
292
293        let p = dir
294            .path()
295            .join("dir")
296            .join(ENTRIES_DIRECTORY)
297            .join(&format!("{}", id));
298        let fh = std::fs::File::open(p).unwrap();
299        fh.lock_exclusive().unwrap();
300
301        let e1 = queue.dequeue().unwrap();
302        assert_eq!(e1, None);
303        drop(fh);
304        let e1 = queue.dequeue().unwrap();
305        assert_eq!(e1, Some("foo".to_string()));
306    }
307}