journal_registry/repository/
collection.rs1use crate::repository::error::Result;
2use crate::repository::{File, Origin, Status};
3use journal_common::Seconds;
4use journal_common::collections::{HashMap, VecDeque};
5use tracing::error;
6
7#[derive(Debug, Clone, Default)]
17#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
18pub struct Chain {
19 pub(crate) files: VecDeque<File>,
21}
22
23impl Chain {
24 pub fn insert_file(&mut self, file: File) {
26 let pos = self.files.partition_point(|f| *f < file);
27
28 if pos < self.files.len() && self.files[pos] == file {
29 return;
30 }
31
32 self.files.insert(pos, file.clone());
33 }
34
35 pub fn remove_file(&mut self, file: &File) {
37 let pos = self.files.partition_point(|f| f < file);
39
40 if pos < self.files.len() && self.files[pos] == *file {
42 self.files.remove(pos);
43 }
44 }
45
46 pub fn pop_front(&mut self) -> Option<File> {
47 self.files.pop_front()
48 }
49
50 pub fn back(&self) -> Option<&File> {
51 self.files.back()
52 }
53
54 pub fn is_empty(&self) -> bool {
55 self.files.is_empty()
56 }
57
58 pub fn len(&self) -> usize {
59 self.files.len()
60 }
61
62 pub fn drain(&mut self, cutoff_time: u64) -> impl Iterator<Item = File> + '_ {
66 let mut drained = Vec::new();
67 let mut retained = VecDeque::new();
68
69 while let Some(file) = self.files.pop_front() {
70 let should_drain = match file.status() {
71 Status::Active => false,
72 Status::Archived { head_realtime, .. } => *head_realtime <= cutoff_time,
73 Status::Disposed { timestamp, .. } => *timestamp <= cutoff_time,
74 };
75
76 if should_drain {
77 drained.push(file);
78 } else {
79 retained.push_back(file);
80 }
81 }
82
83 self.files = retained;
84 drained.into_iter()
85 }
86
87 pub fn find_files_in_range<C>(&self, start: Seconds, end: Seconds, files: &mut C)
91 where
92 C: Extend<File>,
93 {
94 if self.files.is_empty() || start >= end {
95 return;
96 }
97
98 let start = seconds_to_microseconds(start);
99 let end = seconds_to_microseconds(end);
100 let pos = self.first_range_candidate_position(start);
101 let mut prev_head_realtime = self.archived_head_at(pos);
102 let mut iter = self.files.iter().skip(pos).peekable();
103
104 while let Some(file) = iter.next() {
105 match file.status() {
106 Status::Archived { head_realtime, .. } => {
107 if *head_realtime >= end {
108 break;
109 }
110
111 let tail_realtime = next_file_tail_realtime(iter.peek().copied());
112 if range_overlaps(*head_realtime, tail_realtime, start, end) {
113 files.extend(std::iter::once(file.clone()));
114 }
115 prev_head_realtime = Some(*head_realtime);
116 }
117 Status::Active => {
118 let head_realtime = prev_head_realtime.unwrap_or(u64::MIN);
119 if range_overlaps(head_realtime, u64::MAX, start, end) {
120 files.extend(std::iter::once(file.clone()));
121 }
122 break;
123 }
124 Status::Disposed { .. } => {
125 continue;
126 }
127 }
128 }
129 }
130
131 fn first_range_candidate_position(&self, start: u64) -> usize {
132 self.files
133 .partition_point(|f| match f.status() {
134 Status::Active => false,
135 Status::Archived { head_realtime, .. } => *head_realtime < start,
136 Status::Disposed { .. } => true,
137 })
138 .saturating_sub(1)
139 }
140
141 fn archived_head_at(&self, pos: usize) -> Option<u64> {
142 match self.files.get(pos).map(|f| f.status()) {
143 Some(Status::Archived { head_realtime, .. }) => Some(*head_realtime),
144 _ => None,
145 }
146 }
147}
148
149fn seconds_to_microseconds(seconds: Seconds) -> u64 {
150 const USEC_PER_SEC: u64 = std::time::Duration::from_secs(1).as_micros() as u64;
151 seconds.0 as u64 * USEC_PER_SEC
152}
153
154fn next_file_tail_realtime(next_file: Option<&File>) -> u64 {
155 let Some(next_file) = next_file else {
156 return u64::MAX;
157 };
158 match next_file.status() {
159 Status::Archived { head_realtime, .. } => *head_realtime,
160 Status::Active => u64::MAX,
161 Status::Disposed { .. } => {
162 error!(
163 "Disposed file found after archived file, violating chain ordering: {:?}",
164 next_file.path()
165 );
166 u64::MAX
167 }
168 }
169}
170
171fn range_overlaps(head_realtime: u64, tail_realtime: u64, start: u64, end: u64) -> bool {
172 head_realtime < end && tail_realtime > start
173}
174
175#[derive(Default, Debug)]
176#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
177pub(super) struct Directory {
178 pub(super) chains: HashMap<Origin, Chain>,
179}
180
181#[derive(Default)]
194#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
195pub struct Repository {
196 pub(super) directories: HashMap<String, Directory>,
198}
199
200impl Repository {
201 pub fn insert(&mut self, file: File) -> Result<()> {
203 let dir = file.dir()?.to_string();
204
205 if let Some(directory) = self.directories.get_mut(&dir) {
206 if let Some(chain) = directory.chains.get_mut(file.origin()) {
207 chain.insert_file(file);
208 } else {
209 let origin = file.origin().clone();
210 let mut chain = Chain::default();
211 chain.insert_file(file);
212 directory.chains.insert(origin, chain);
213 }
214 } else {
215 let origin = file.origin().clone();
216 let mut chain = Chain::default();
217 chain.insert_file(file);
218
219 let mut directory = Directory::default();
220 directory.chains.insert(origin, chain);
221
222 self.directories.insert(dir, directory);
223 }
224 Ok(())
225 }
226
227 pub fn remove(&mut self, file: &File) -> Result<()> {
229 let dir = file.dir()?;
230 let mut remove_directory = false;
231
232 if let Some(directory) = self.directories.get_mut(dir) {
233 let mut remove_chain = false;
234
235 if let Some(chain) = directory.chains.get_mut(file.origin()) {
236 chain.remove_file(file);
237 remove_chain = chain.is_empty();
238 };
239
240 if remove_chain {
241 directory.chains.remove(file.origin());
242 }
243
244 remove_directory = directory.chains.is_empty();
245 };
246
247 if remove_directory {
248 self.directories.remove(dir);
249 }
250 Ok(())
251 }
252
253 pub fn remove_directory(&mut self, path: &str) {
255 self.directories.remove(path);
256 }
257
258 pub fn find_files_in_range<C>(&self, start: Seconds, end: Seconds) -> C
260 where
261 C: FromIterator<File> + Extend<File> + Default,
262 {
263 let mut files = C::default();
264
265 for directory in self.directories.values() {
266 for chain in directory.chains.values() {
267 chain.find_files_in_range(start, end, &mut files);
268 }
269 }
270
271 files
272 }
273}