Skip to main content

journal_registry/repository/
collection.rs

1use crate::repository::error::Result;
2use crate::repository::{File, Origin, Status};
3use journal_common::Seconds;
4use journal_common::collections::{HashMap, VecDeque};
5use tracing::error;
6
7/// An ordered collection of journal files from the same origin
8///
9/// Files are kept sorted by status and time:
10/// - Disposed files (corrupted) come first
11/// - Archived files follow in chronological order (by head_realtime)
12/// - Active file (if any) comes last
13///
14/// This ordering is maintained automatically by [`insert_file()`](Self::insert_file)
15/// and is critical for correct time-range queries.
16#[derive(Debug, Clone, Default)]
17#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
18pub struct Chain {
19    /// Ordered collection of files maintaining the sorting invariant
20    pub(crate) files: VecDeque<File>,
21}
22
23impl Chain {
24    /// Insert a file maintaining sorted order (disposed → archived → active)
25    pub fn insert_file(&mut self, file: File) {
26        let pos = self.files.partition_point(|f| *f < file);
27
28        if pos < self.files.len() && self.files[pos] == file {
29            return;
30        }
31
32        self.files.insert(pos, file.clone());
33    }
34
35    /// Remove a file from the chain
36    pub fn remove_file(&mut self, file: &File) {
37        // Use partition_point to find where the file would be
38        let pos = self.files.partition_point(|f| f < file);
39
40        // Check if the file at this position matches the one we want to remove
41        if pos < self.files.len() && self.files[pos] == *file {
42            self.files.remove(pos);
43        }
44    }
45
46    pub fn pop_front(&mut self) -> Option<File> {
47        self.files.pop_front()
48    }
49
50    pub fn back(&self) -> Option<&File> {
51        self.files.back()
52    }
53
54    pub fn is_empty(&self) -> bool {
55        self.files.is_empty()
56    }
57
58    pub fn len(&self) -> usize {
59        self.files.len()
60    }
61
62    /// Remove files older than cutoff time (microseconds since epoch)
63    ///
64    /// Active files are never drained.
65    pub fn drain(&mut self, cutoff_time: u64) -> impl Iterator<Item = File> + '_ {
66        let mut drained = Vec::new();
67        let mut retained = VecDeque::new();
68
69        while let Some(file) = self.files.pop_front() {
70            let should_drain = match file.status() {
71                Status::Active => false,
72                Status::Archived { head_realtime, .. } => *head_realtime <= cutoff_time,
73                Status::Disposed { timestamp, .. } => *timestamp <= cutoff_time,
74            };
75
76            if should_drain {
77                drained.push(file);
78            } else {
79                retained.push_back(file);
80            }
81        }
82
83        self.files = retained;
84        drained.into_iter()
85    }
86
87    /// Find files that overlap with the time range [start, end)
88    ///
89    /// Extends the provided collection with matching files.
90    pub fn find_files_in_range<C>(&self, start: Seconds, end: Seconds, files: &mut C)
91    where
92        C: Extend<File>,
93    {
94        if self.files.is_empty() || start >= end {
95            return;
96        }
97
98        let start = seconds_to_microseconds(start);
99        let end = seconds_to_microseconds(end);
100        let pos = self.first_range_candidate_position(start);
101        let mut prev_head_realtime = self.archived_head_at(pos);
102        let mut iter = self.files.iter().skip(pos).peekable();
103
104        while let Some(file) = iter.next() {
105            match file.status() {
106                Status::Archived { head_realtime, .. } => {
107                    if *head_realtime >= end {
108                        break;
109                    }
110
111                    let tail_realtime = next_file_tail_realtime(iter.peek().copied());
112                    if range_overlaps(*head_realtime, tail_realtime, start, end) {
113                        files.extend(std::iter::once(file.clone()));
114                    }
115                    prev_head_realtime = Some(*head_realtime);
116                }
117                Status::Active => {
118                    let head_realtime = prev_head_realtime.unwrap_or(u64::MIN);
119                    if range_overlaps(head_realtime, u64::MAX, start, end) {
120                        files.extend(std::iter::once(file.clone()));
121                    }
122                    break;
123                }
124                Status::Disposed { .. } => {
125                    continue;
126                }
127            }
128        }
129    }
130
131    fn first_range_candidate_position(&self, start: u64) -> usize {
132        self.files
133            .partition_point(|f| match f.status() {
134                Status::Active => false,
135                Status::Archived { head_realtime, .. } => *head_realtime < start,
136                Status::Disposed { .. } => true,
137            })
138            .saturating_sub(1)
139    }
140
141    fn archived_head_at(&self, pos: usize) -> Option<u64> {
142        match self.files.get(pos).map(|f| f.status()) {
143            Some(Status::Archived { head_realtime, .. }) => Some(*head_realtime),
144            _ => None,
145        }
146    }
147}
148
149fn seconds_to_microseconds(seconds: Seconds) -> u64 {
150    const USEC_PER_SEC: u64 = std::time::Duration::from_secs(1).as_micros() as u64;
151    seconds.0 as u64 * USEC_PER_SEC
152}
153
154fn next_file_tail_realtime(next_file: Option<&File>) -> u64 {
155    let Some(next_file) = next_file else {
156        return u64::MAX;
157    };
158    match next_file.status() {
159        Status::Archived { head_realtime, .. } => *head_realtime,
160        Status::Active => u64::MAX,
161        Status::Disposed { .. } => {
162            error!(
163                "Disposed file found after archived file, violating chain ordering: {:?}",
164                next_file.path()
165            );
166            u64::MAX
167        }
168    }
169}
170
171fn range_overlaps(head_realtime: u64, tail_realtime: u64, start: u64, end: u64) -> bool {
172    head_realtime < end && tail_realtime > start
173}
174
175#[derive(Default, Debug)]
176#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
177pub(super) struct Directory {
178    pub(super) chains: HashMap<Origin, Chain>,
179}
180
181/// A repository that organizes journal files by directory and origin
182///
183/// The repository maintains a three-level hierarchy:
184/// ```text
185/// Repository
186///   └─ Directory (/var/log/journal)
187///       └─ Origin (System, User(1000), etc.)
188///           └─ Chain (ordered list of files)
189/// ```
190///
191/// This structure allows efficient querying and management of journal files
192/// from multiple directories and origins.
193#[derive(Default)]
194#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
195pub struct Repository {
196    /// Maps journal directory paths to their contents
197    pub(super) directories: HashMap<String, Directory>,
198}
199
200impl Repository {
201    /// Insert a file into the appropriate directory/origin/chain
202    pub fn insert(&mut self, file: File) -> Result<()> {
203        let dir = file.dir()?.to_string();
204
205        if let Some(directory) = self.directories.get_mut(&dir) {
206            if let Some(chain) = directory.chains.get_mut(file.origin()) {
207                chain.insert_file(file);
208            } else {
209                let origin = file.origin().clone();
210                let mut chain = Chain::default();
211                chain.insert_file(file);
212                directory.chains.insert(origin, chain);
213            }
214        } else {
215            let origin = file.origin().clone();
216            let mut chain = Chain::default();
217            chain.insert_file(file);
218
219            let mut directory = Directory::default();
220            directory.chains.insert(origin, chain);
221
222            self.directories.insert(dir, directory);
223        }
224        Ok(())
225    }
226
227    /// Remove a file and clean up empty chains/directories
228    pub fn remove(&mut self, file: &File) -> Result<()> {
229        let dir = file.dir()?;
230        let mut remove_directory = false;
231
232        if let Some(directory) = self.directories.get_mut(dir) {
233            let mut remove_chain = false;
234
235            if let Some(chain) = directory.chains.get_mut(file.origin()) {
236                chain.remove_file(file);
237                remove_chain = chain.is_empty();
238            };
239
240            if remove_chain {
241                directory.chains.remove(file.origin());
242            }
243
244            remove_directory = directory.chains.is_empty();
245        };
246
247        if remove_directory {
248            self.directories.remove(dir);
249        }
250        Ok(())
251    }
252
253    /// Remove all files from a directory
254    pub fn remove_directory(&mut self, path: &str) {
255        self.directories.remove(path);
256    }
257
258    /// Collect all files in the given time range
259    pub fn find_files_in_range<C>(&self, start: Seconds, end: Seconds) -> C
260    where
261        C: FromIterator<File> + Extend<File> + Default,
262    {
263        let mut files = C::default();
264
265        for directory in self.directories.values() {
266            for chain in directory.chains.values() {
267                chain.find_files_in_range(start, end, &mut files);
268            }
269        }
270
271        files
272    }
273}