Skip to main content

journal_registry/repository/
file.rs

1use crate::repository::RepositoryError;
2use crate::repository::error::Result;
3use serde::{Deserialize, Serialize};
4use std::cmp::Ordering;
5use std::path::Path;
6use std::sync::Arc;
7use uuid::Uuid;
8
9/// Status of a journal file
10#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
11#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
12pub enum Status {
13    /// Active journal file currently being written to
14    Active,
15    /// Archived journal file that has been rotated and is no longer being written to
16    Archived {
17        /// Sequence number ID for ordering entries across files
18        #[cfg_attr(feature = "allocative", allocative(skip))]
19        seqnum_id: Uuid,
20        /// Sequence number of the first entry in this file
21        head_seqnum: u64,
22        /// Realtime timestamp (microseconds since epoch) of the first entry
23        head_realtime: u64,
24    },
25    /// Disposed (corrupted or incomplete) journal file marked for cleanup
26    Disposed {
27        /// Timestamp when the file was disposed (microseconds since epoch)
28        timestamp: u64,
29        /// Sequence number for ordering multiple disposed files
30        number: u64,
31    },
32}
33
34impl Ord for Status {
35    fn cmp(&self, other: &Self) -> Ordering {
36        match (self, other) {
37            // Disposed files come first, sorted by timestamp then number
38            (
39                Status::Disposed {
40                    timestamp: t1,
41                    number: n1,
42                },
43                Status::Disposed {
44                    timestamp: t2,
45                    number: n2,
46                },
47            ) => t1.cmp(t2).then_with(|| n1.cmp(n2)),
48
49            // Disposed always comes before non-disposed
50            (Status::Disposed { .. }, _) => Ordering::Less,
51            (_, Status::Disposed { .. }) => Ordering::Greater,
52
53            // Archived files sorted by head_realtime (then seqnum for stability)
54            (
55                Status::Archived {
56                    seqnum_id: lhs_seqnum_id,
57                    head_seqnum: lhs_head_seqnum,
58                    head_realtime: lhs_head_realtime,
59                },
60                Status::Archived {
61                    seqnum_id: rhs_seqnum_id,
62                    head_seqnum: rhs_head_seqnum,
63                    head_realtime: rhs_head_realtime,
64                },
65            ) => lhs_head_realtime
66                .cmp(rhs_head_realtime)
67                .then_with(|| lhs_seqnum_id.cmp(rhs_seqnum_id))
68                .then_with(|| lhs_head_seqnum.cmp(rhs_head_seqnum)),
69
70            // Archived comes before Active
71            (Status::Archived { .. }, Status::Active) => Ordering::Less,
72            (Status::Active, Status::Archived { .. }) => Ordering::Greater,
73
74            // Active files are equal in terms of status ordering
75            (Status::Active, Status::Active) => Ordering::Equal,
76        }
77    }
78}
79
80impl PartialOrd for Status {
81    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
82        Some(self.cmp(other))
83    }
84}
85
86impl Status {
87    /// Parse the journal file status from the end of the path, returning the status and the remaining path
88    pub(super) fn parse(path: &str) -> Option<(Self, &str)> {
89        if let Some(stem) = path.strip_suffix(".journal") {
90            return Self::parse_journal_stem(stem);
91        }
92        let stem = path.strip_suffix(".journal~")?;
93        Self::parse_disposed_stem(stem)
94    }
95
96    fn parse_journal_stem(stem: &str) -> Option<(Self, &str)> {
97        if let Some((prefix, suffix)) = stem.rsplit_once('@') {
98            return Self::parse_archived_suffix(prefix, suffix);
99        }
100        Some((Status::Active, stem))
101    }
102
103    fn parse_archived_suffix<'a>(prefix: &'a str, suffix: &str) -> Option<(Self, &'a str)> {
104        let mut parts = suffix.split('-');
105        let seqnum_id = Uuid::try_parse(parts.next()?).ok()?;
106        let head_seqnum = u64::from_str_radix(parts.next()?, 16).ok()?;
107        let head_realtime = u64::from_str_radix(parts.next()?, 16).ok()?;
108        if parts.next().is_some() {
109            return None;
110        }
111
112        Some((
113            Status::Archived {
114                seqnum_id,
115                head_seqnum,
116                head_realtime,
117            },
118            prefix,
119        ))
120    }
121
122    fn parse_disposed_stem(stem: &str) -> Option<(Self, &str)> {
123        let (prefix, suffix) = stem.rsplit_once('@')?;
124        let (timestamp, number) = suffix.rsplit_once('-')?;
125        let timestamp = u64::from_str_radix(timestamp, 16).ok()?;
126        let number = u64::from_str_radix(number, 16).ok()?;
127        Some((Status::Disposed { timestamp, number }, prefix))
128    }
129}
130
131/// Source of journal entries
132#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
133#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
134pub enum Source {
135    /// System-wide journal (system.journal)
136    System,
137    /// User-specific journal with the given UID
138    User(u32),
139    /// Journal from a remote host
140    Remote(String),
141    /// Unknown or non-standard journal type
142    Unknown(String),
143}
144
145impl Source {
146    /// Parse the journal basename from the end of the path, returning the basename and the remaining path
147    pub(super) fn parse(path: &str) -> Option<(Self, &str)> {
148        // Split on the last '/' to get directory and basename
149        let (dir_path, basename) = path.rsplit_once('/')?;
150
151        let journal_type = if basename == "system" {
152            Source::System
153        } else if let Some(uid_str) = basename.strip_prefix("user-") {
154            if let Ok(uid) = uid_str.parse::<u32>() {
155                Source::User(uid)
156            } else {
157                Source::Unknown(basename.to_string())
158            }
159        } else if let Some(remote_host) = basename.strip_prefix("remote-") {
160            Source::Remote(remote_host.to_string())
161        } else {
162            Source::Unknown(basename.to_string())
163        };
164
165        Some((journal_type, dir_path))
166    }
167}
168
169/// Origin identifies where a journal file comes from
170#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
171#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
172pub struct Origin {
173    /// Machine ID from which the journal originates
174    #[cfg_attr(feature = "allocative", allocative(skip))]
175    pub machine_id: Option<Uuid>,
176    /// Optional namespace for isolated journal instances
177    pub namespace: Option<String>,
178    /// Source type (system, user, remote, or unknown)
179    pub source: Source,
180}
181
182#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
183#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
184pub(crate) struct FileInner {
185    pub(crate) path: String,
186    pub(crate) origin: Origin,
187    pub(crate) status: Status,
188}
189
190#[derive(Debug, Clone, PartialEq, Eq, Hash)]
191#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
192pub struct File {
193    pub(super) inner: Arc<FileInner>,
194}
195
196impl serde::Serialize for File {
197    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
198    where
199        S: serde::Serializer,
200    {
201        self.inner.as_ref().serialize(serializer)
202    }
203}
204
205impl<'de> serde::Deserialize<'de> for File {
206    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
207    where
208        D: serde::Deserializer<'de>,
209    {
210        let inner = FileInner::deserialize(deserializer)?;
211        Ok(File {
212            inner: Arc::new(inner),
213        })
214    }
215}
216
217impl File {
218    pub fn path(&self) -> &str {
219        &self.inner.path
220    }
221
222    pub fn origin(&self) -> &Origin {
223        &self.inner.origin
224    }
225
226    pub fn status(&self) -> &Status {
227        &self.inner.status
228    }
229
230    pub fn from_path(path: &Path) -> Option<Self> {
231        if !path.is_absolute() {
232            return None;
233        }
234
235        let path_str = path.to_str()?;
236        let filename = path.file_name()?.to_str()?;
237        let filename_path = format!("/{filename}");
238        let (status, path_after_status) = Status::parse(&filename_path)?;
239        let (source, _) = Source::parse(path_after_status)?;
240
241        let (machine_id, namespace) = path
242            .parent()
243            .and_then(|parent| parent.file_name())
244            .and_then(|dirname| dirname.to_str())
245            .map(parse_machine_id_namespace)
246            .unwrap_or((None, None));
247
248        let origin = Origin {
249            machine_id,
250            namespace,
251            source,
252        };
253
254        let inner = Arc::new(FileInner {
255            path: path_str.to_string(),
256            origin,
257            status,
258        });
259
260        Some(File { inner })
261    }
262
263    pub fn from_raw_path(path: &Path) -> Option<Self> {
264        let path = path.to_str()?;
265        let raw_path = Path::new(path);
266        if !raw_path.is_absolute() {
267            return None;
268        }
269
270        let inner = Arc::new(FileInner {
271            path: path.to_string(),
272            origin: Origin {
273                machine_id: None,
274                namespace: None,
275                source: Source::Unknown(
276                    raw_path
277                        .file_stem()
278                        .and_then(|stem| stem.to_str())
279                        .unwrap_or("journal")
280                        .to_string(),
281                ),
282            },
283            status: Status::Active,
284        });
285
286        Some(File { inner })
287    }
288
289    #[allow(clippy::should_implement_trait)]
290    pub fn from_str(path: &str) -> Option<Self> {
291        // We only accept absolute paths
292        if !path.starts_with("/") {
293            return None;
294        }
295
296        // Parse from right to left
297        let (status, path_after_status) = Status::parse(path)?;
298        let (source, path_after_source) = Source::parse(path_after_status)?;
299
300        // Try to parse machine ID and namespace from the directory name
301        let (machine_id, namespace) = if !path_after_source.is_empty() {
302            // Get the last directory component
303            let dirname = if let Some((_parent, dir)) = path_after_source.rsplit_once('/') {
304                dir
305            } else {
306                path_after_source
307            };
308
309            if let Some((id_str, ns)) = dirname.split_once('.') {
310                // Has namespace
311                let machine_id = Uuid::try_parse(id_str).ok()?;
312                (Some(machine_id), Some(ns.to_string()))
313            } else {
314                // No namespace, just machine ID
315                let machine_id = Uuid::try_parse(dirname).ok();
316                (machine_id, None)
317            }
318        } else {
319            (None, None)
320        };
321
322        let origin = Origin {
323            machine_id,
324            namespace,
325            source,
326        };
327
328        let inner = Arc::new(FileInner {
329            path: String::from(path),
330            origin,
331            status,
332        });
333
334        Some(File { inner })
335    }
336
337    pub fn dir(&self) -> Result<&str> {
338        Path::new(&self.inner.path)
339            .parent()
340            .and_then(|p| {
341                if self.inner.origin.machine_id.is_some() {
342                    p.parent()
343                } else {
344                    Some(p)
345                }
346            })
347            .and_then(|p| p.to_str())
348            .ok_or_else(|| RepositoryError::InvalidUtf8 {
349                path: Path::new(&self.inner.path).to_path_buf(),
350            })
351    }
352
353    /// Check if a path looks like a journal file
354    pub fn is_journal_file(path: &str) -> bool {
355        path.ends_with(".journal") || path.ends_with(".journal~")
356    }
357
358    /// Check if this is an active journal file that's currently being written to
359    pub fn is_active(&self) -> bool {
360        matches!(self.inner.status, Status::Active)
361    }
362
363    /// Check if this is an archived journal file
364    pub fn is_archived(&self) -> bool {
365        matches!(self.inner.status, Status::Archived { .. })
366    }
367
368    /// Check if this is a corrupted/disposed journal file
369    pub fn is_disposed(&self) -> bool {
370        matches!(self.inner.status, Status::Disposed { .. })
371    }
372
373    /// Check if this contains logs from users
374    pub fn is_user(&self) -> bool {
375        matches!(self.inner.origin.source, Source::User(_))
376    }
377
378    /// Check if this contains logs from system
379    pub fn is_system(&self) -> bool {
380        matches!(self.inner.origin.source, Source::System)
381    }
382
383    pub fn is_remote(&self) -> bool {
384        matches!(self.inner.origin.source, Source::Remote(_))
385    }
386
387    /// Get the user ID if this is a user journal
388    pub fn user_id(&self) -> Option<u32> {
389        match &self.inner.origin.source {
390            Source::User(uid) => Some(*uid),
391            _ => None,
392        }
393    }
394
395    /// Get the remote host if this is a remote journal
396    pub fn remote_host(&self) -> Option<&str> {
397        match &self.inner.origin.source {
398            Source::Remote(host) => Some(host.as_str()),
399            _ => None,
400        }
401    }
402
403    /// Get the namespace if this journal belongs to a namespace
404    pub fn namespace(&self) -> Option<&str> {
405        self.inner.origin.namespace.as_deref()
406    }
407}
408
409fn parse_machine_id_namespace(dirname: &str) -> (Option<Uuid>, Option<String>) {
410    if let Some((id_str, ns)) = dirname.split_once('.') {
411        let Some(machine_id) = Uuid::try_parse(id_str).ok() else {
412            return (None, None);
413        };
414        (Some(machine_id), Some(ns.to_string()))
415    } else {
416        (Uuid::try_parse(dirname).ok(), None)
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use super::{File, Source, Status};
423    use std::path::PathBuf;
424
425    #[test]
426    fn from_path_parses_native_absolute_paths() {
427        let dir = tempfile::tempdir().expect("temp dir");
428        let path = dir
429            .path()
430            .join("00112233445566778899aabbccddeeff")
431            .join("system.journal");
432        let file = File::from_path(&path).expect("native absolute path parses");
433
434        assert_eq!(file.path(), path.to_str().expect("utf8 path"));
435        assert_eq!(
436            file.origin()
437                .machine_id
438                .expect("machine id")
439                .simple()
440                .to_string(),
441            "00112233445566778899aabbccddeeff"
442        );
443        assert_eq!(file.origin().source, Source::System);
444        assert_eq!(file.status(), &Status::Active);
445    }
446
447    #[test]
448    fn from_path_rejects_relative_paths() {
449        assert!(File::from_path(&PathBuf::from("system.journal")).is_none());
450    }
451
452    #[test]
453    fn from_raw_path_accepts_native_absolute_paths() {
454        let dir = tempfile::tempdir().expect("temp dir");
455        let path = dir.path().join("raw-byte-names.journal");
456        let file = File::from_raw_path(&path).expect("raw native absolute path parses");
457
458        assert_eq!(file.path(), path.to_str().expect("utf8 path"));
459        assert_eq!(
460            file.origin().source,
461            Source::Unknown("raw-byte-names".to_string())
462        );
463        assert_eq!(file.status(), &Status::Active);
464    }
465}
466
467impl Ord for File {
468    fn cmp(&self, other: &Self) -> Ordering {
469        // First compare by status, then by path for stability
470        self.inner
471            .status
472            .cmp(&other.inner.status)
473            .then_with(|| self.inner.path.cmp(&other.inner.path))
474    }
475}
476
477impl PartialOrd for File {
478    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
479        Some(self.cmp(other))
480    }
481}
482
483/// Scan a directory recursively for journal files
484pub fn scan_journal_files(path: &str) -> Result<Vec<File>> {
485    let mut files = Vec::new();
486
487    for entry in walkdir::WalkDir::new(path).follow_links(false) {
488        let entry = entry?;
489        let path = entry.path();
490
491        if path.is_file() {
492            if let Some(file) = File::from_path(path) {
493                files.push(file);
494            }
495        }
496    }
497
498    Ok(files)
499}