1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use std::borrow::Cow;
use std::collections::HashMap;
use std::ops::RangeBounds;
use crate::error::{AlreadyExists, AppendError, MissingQueue, TouchError};
use crate::mem::MemQueue;
use crate::rolling::FileNumber;
#[derive(Default)]
pub struct MemQueues {
queues: HashMap<String, MemQueue>,
}
impl MemQueues {
/// The file number argument is here unused. Its point is just to make sure we
/// flushed the file before updating the in memory queue.
pub fn create_queue(&mut self, queue: &str) -> Result<(), AlreadyExists> {
if self.queues.contains_key(queue) {
return Err(AlreadyExists);
}
self.queues.insert(queue.to_string(), MemQueue::default());
Ok(())
}
pub fn delete_queue(&mut self, queue: &str) -> Result<(), MissingQueue> {
if self.queues.remove(queue).is_none() {
return Err(MissingQueue(queue.to_string()));
}
Ok(())
}
/// Returns all sub-queues which are currently empty.
pub fn empty_queues(&mut self) -> impl Iterator<Item = (&'_ str, &mut MemQueue)> + '_ {
self.queues.iter_mut().filter_map(|(queue, mem_queue)| {
if mem_queue.is_empty() {
Some((queue.as_str(), mem_queue))
} else {
None
}
})
}
pub fn range<R>(
&self,
queue: &str,
range: R,
) -> Option<impl Iterator<Item = (u64, Cow<[u8]>)> + '_>
where
R: RangeBounds<u64> + 'static,
{
// We do not rely on `entry` in order to avoid
// the allocation.
Some(self.queues.get(queue)?.range(range))
}
fn get_queue(&self, queue: &str) -> Result<&MemQueue, MissingQueue> {
// We do not rely on `entry` in order to avoid
// the allocation.
self.queues
.get(queue)
.ok_or_else(|| MissingQueue(queue.to_string()))
}
fn get_queue_mut(&mut self, queue: &str) -> Result<&mut MemQueue, MissingQueue> {
// We do not rely on `entry` in order to avoid
// the allocation.
self.queues
.get_mut(queue)
.ok_or_else(|| MissingQueue(queue.to_string()))
}
pub fn append_record(
&mut self,
queue: &str,
file_number: &FileNumber,
target_position: u64,
payload: &[u8],
) -> Result<(), AppendError> {
self.get_queue_mut(queue)?
.append_record(file_number, target_position, payload)?;
Ok(())
}
pub fn contains_queue(&self, queue: &str) -> bool {
self.queues.contains_key(queue)
}
pub fn list_queues(&self) -> impl Iterator<Item = &str> {
self.queues.keys().map(|queue| queue.as_str())
}
/// Ensure that the queue is empty and start_position = next_position.
///
/// Returns an error if the queue already exists and contains elements,
/// or is empty but has a next_position that does not match.
pub fn ack_position(&mut self, queue: &str, next_position: u64) -> Result<(), TouchError> {
if let Some(queue) = self.queues.get(queue) {
// It is possible for `ack_position` to be called when a queue already exists.
//
// For instance, we may have recorded the position of an empty stale queue
// twice in the same file. Nothing prevents that from happening today.
//
// Another possibility is if an IO error occured right after recording position
// and before deleting files.
if !queue.is_empty() || queue.next_position() != next_position {
return Err(TouchError);
}
} else {
// The queue does not exist! Let's create it and set the right `next_position`.
self.queues.insert(
queue.to_string(),
MemQueue::with_next_position(next_position),
);
}
Ok(())
}
pub fn next_position(&self, queue: &str) -> Result<u64, MissingQueue> {
Ok(self.get_queue(queue)?.next_position())
}
/// Removes records up to the supplied `position`,
/// including the position itself.
//
/// If there are no records `<= position`, the method will
/// not do anything.
///
/// If one or more files should be removed,
/// returns the range of the files that should be removed
pub fn truncate(&mut self, queue: &str, position: u64) {
if let Ok(queue) = self.get_queue_mut(queue) {
queue.truncate(position);
}
}
pub fn size(&self) -> usize {
self.queues
.iter()
.map(|(name, queue)| name.len() + queue.size())
.sum()
}
}