1use queue_file::QueueFile;
2use serde::de::DeserializeOwned;
3use serde::Serialize;
4use std::io::{Error as IOError, ErrorKind as IOErrorKind, Result as IOResult};
5use std::marker::PhantomData;
6use std::path::Path;
7
8pub trait IBincodeSerializable<T = Self>
9where
10 Self: DeserializeOwned + Serialize + Clone + Send + Sized,
11{
12 fn from_bincode(bincode_slice: &[u8]) -> Option<Self> {
13 bincode::deserialize(bincode_slice).ok()
14 }
15
16 fn to_bincode(&self) -> Vec<u8> {
17 bincode::serialize(self).unwrap()
18 }
19}
20
21pub struct PQBincode<T>
22where
23 T: IBincodeSerializable,
24{
25 qfq: QueueFile,
26 file_path: String,
27 marker: PhantomData<T>,
28}
29
30impl<T> PQBincode<T>
31where
32 T: IBincodeSerializable,
33{
34 pub fn new<S: AsRef<Path> + ToString>(path: S) -> IOResult<Self> {
35 let file_path = path.to_string();
36
37 match QueueFile::open(path) {
38 Err(error_result) => Err(IOError::new(IOErrorKind::Other, error_result.to_string())),
39 Ok(qfq) => Ok(Self {
40 qfq,
41 file_path,
42 marker: Default::default(),
43 }),
44 }
45 }
46
47 pub fn enqueue(&mut self, data: T) -> IOResult<()> {
48 let data_bin = data.to_bincode();
49
50 match self.qfq.add(&data_bin[..]) {
51 Err(error_result) => Err(IOError::new(IOErrorKind::Other, error_result.to_string())),
52 Ok(_) => Ok(()),
53 }
54 }
55
56 pub fn enqueue_all(&mut self, data: Vec<T>) -> IOResult<()> {
57 for data_item in data {
58 self.enqueue(data_item)?;
59 }
60
61 Ok(())
62 }
63
64 pub fn dequeue(&mut self) -> IOResult<Option<T>> {
65 let mut dequeued_item = None;
66 self.cancellable_dequeue(|next_item| {
67 dequeued_item = Some(next_item);
68 true
69 })?;
70
71 Ok(dequeued_item)
72 }
73
74 pub fn dequeue_all(&mut self) -> IOResult<Vec<T>> {
75 let mut dequeued_items = Vec::new();
76
77 while self.cancellable_dequeue(|next_item| {
78 dequeued_items.push(next_item);
79 true
80 })? {}
81
82 Ok(dequeued_items)
83 }
84
85 pub fn cancellable_dequeue<F>(&mut self, doubtful_dequeue_task: F) -> IOResult<bool>
86 where
87 F: FnOnce(T) -> bool,
88 {
89 match self.qfq.peek() {
90 Err(error_result) => Err(IOError::new(IOErrorKind::Other, error_result.to_string())),
91 Ok(data_bin) => {
92 if data_bin.is_none() {
93 return Ok(false);
94 }
95
96 let data_bin = data_bin.unwrap();
97
98 match T::from_bincode(&data_bin) {
99 None => Err(IOError::new(
100 IOErrorKind::InvalidData,
101 "Cannot deserialize data, invalid format!",
102 )),
103 Some(data) => {
104 if doubtful_dequeue_task(data) {
105 if let Err(error_result) = self.qfq.remove() {
106 return Err(IOError::new(IOErrorKind::Other, error_result.to_string()));
107 }
108
109 Ok(true)
110 } else {
111 Ok(false)
112 }
113 }
114 }
115 }
116 }
117 }
118
119 pub fn count(&self) -> usize {
120 self.qfq.size()
121 }
122
123 pub fn get_persistent_path(&self) -> String {
124 self.file_path.clone()
125 }
126}