pq_bincode/
lib.rs

1use queue_file::QueueFile;
2use serde::de::DeserializeOwned;
3use serde::Serialize;
4use std::io::{Error as IOError, ErrorKind as IOErrorKind, Result as IOResult};
5use std::marker::PhantomData;
6use std::path::Path;
7
8pub trait IBincodeSerializable<T = Self>
9where
10    Self: DeserializeOwned + Serialize + Clone + Send + Sized,
11{
12    fn from_bincode(bincode_slice: &[u8]) -> Option<Self> {
13        bincode::deserialize(bincode_slice).ok()
14    }
15
16    fn to_bincode(&self) -> Vec<u8> {
17        bincode::serialize(self).unwrap()
18    }
19}
20
21pub struct PQBincode<T>
22where
23    T: IBincodeSerializable,
24{
25    qfq: QueueFile,
26    file_path: String,
27    marker: PhantomData<T>,
28}
29
30impl<T> PQBincode<T>
31where
32    T: IBincodeSerializable,
33{
34    pub fn new<S: AsRef<Path> + ToString>(path: S) -> IOResult<Self> {
35        let file_path = path.to_string();
36
37        match QueueFile::open(path) {
38            Err(error_result) => Err(IOError::new(IOErrorKind::Other, error_result.to_string())),
39            Ok(qfq) => Ok(Self {
40                qfq,
41                file_path,
42                marker: Default::default(),
43            }),
44        }
45    }
46
47    pub fn enqueue(&mut self, data: T) -> IOResult<()> {
48        let data_bin = data.to_bincode();
49
50        match self.qfq.add(&data_bin[..]) {
51            Err(error_result) => Err(IOError::new(IOErrorKind::Other, error_result.to_string())),
52            Ok(_) => Ok(()),
53        }
54    }
55
56    pub fn enqueue_all(&mut self, data: Vec<T>) -> IOResult<()> {
57        for data_item in data {
58            self.enqueue(data_item)?;
59        }
60
61        Ok(())
62    }
63
64    pub fn dequeue(&mut self) -> IOResult<Option<T>> {
65        let mut dequeued_item = None;
66        self.cancellable_dequeue(|next_item| {
67            dequeued_item = Some(next_item);
68            true
69        })?;
70
71        Ok(dequeued_item)
72    }
73
74    pub fn dequeue_all(&mut self) -> IOResult<Vec<T>> {
75        let mut dequeued_items = Vec::new();
76
77        while self.cancellable_dequeue(|next_item| {
78            dequeued_items.push(next_item);
79            true
80        })? {}
81
82        Ok(dequeued_items)
83    }
84
85    pub fn cancellable_dequeue<F>(&mut self, doubtful_dequeue_task: F) -> IOResult<bool>
86    where
87        F: FnOnce(T) -> bool,
88    {
89        match self.qfq.peek() {
90            Err(error_result) => Err(IOError::new(IOErrorKind::Other, error_result.to_string())),
91            Ok(data_bin) => {
92                if data_bin.is_none() {
93                    return Ok(false);
94                }
95
96                let data_bin = data_bin.unwrap();
97
98                match T::from_bincode(&data_bin) {
99                    None => Err(IOError::new(
100                        IOErrorKind::InvalidData,
101                        "Cannot deserialize data, invalid format!",
102                    )),
103                    Some(data) => {
104                        if doubtful_dequeue_task(data) {
105                            if let Err(error_result) = self.qfq.remove() {
106                                return Err(IOError::new(IOErrorKind::Other, error_result.to_string()));
107                            }
108
109                            Ok(true)
110                        } else {
111                            Ok(false)
112                        }
113                    }
114                }
115            }
116        }
117    }
118
119    pub fn count(&self) -> usize {
120        self.qfq.size()
121    }
122
123    pub fn get_persistent_path(&self) -> String {
124        self.file_path.clone()
125    }
126}