mod retention;
mod writer;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tokio::sync::{broadcast, mpsc};
use crate::vortix_core::engine::event::{EngineEvent, EventEnvelope};
pub use retention::RetentionStats;
pub const DEFAULT_RETENTION_DAYS: u32 = 30;
pub const DEFAULT_RETENTION_COUNT: u32 = 30;
pub const DEFAULT_BROADCAST_CAPACITY: usize = 1024;
pub const DEFAULT_TAIL_BUFFER_CAPACITY: usize = 1000;
static GLOBAL_JOURNAL: std::sync::OnceLock<Journal> = std::sync::OnceLock::new();
pub fn set_global_journal(journal: Journal) {
let _ = GLOBAL_JOURNAL.set(journal);
}
#[must_use]
pub fn global_journal() -> Option<&'static Journal> {
GLOBAL_JOURNAL.get()
}
#[derive(Debug, Clone)]
pub struct JournalConfig {
pub disk: bool,
pub retention_days: u32,
pub retention_count: u32,
pub journal_dir: Option<PathBuf>,
pub tail_capacity: usize,
pub broadcast_capacity: usize,
}
impl Default for JournalConfig {
fn default() -> Self {
Self {
disk: true,
retention_days: DEFAULT_RETENTION_DAYS,
retention_count: DEFAULT_RETENTION_COUNT,
journal_dir: None,
tail_capacity: DEFAULT_TAIL_BUFFER_CAPACITY,
broadcast_capacity: DEFAULT_BROADCAST_CAPACITY,
}
}
}
#[derive(Clone)]
pub struct Journal {
sender: mpsc::UnboundedSender<EventEnvelope>,
broadcaster: broadcast::Sender<EventEnvelope>,
tail: Arc<Mutex<TailBuffer>>,
pub session_path: Option<PathBuf>,
}
impl std::fmt::Debug for Journal {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Journal")
.field("session_path", &self.session_path)
.field("subscribers", &self.broadcaster.receiver_count())
.finish_non_exhaustive()
}
}
#[derive(Debug)]
struct TailBuffer {
capacity: usize,
items: std::collections::VecDeque<EventEnvelope>,
}
impl TailBuffer {
fn new(capacity: usize) -> Self {
Self {
capacity,
items: std::collections::VecDeque::with_capacity(capacity),
}
}
fn push(&mut self, env: EventEnvelope) {
if self.items.len() == self.capacity {
self.items.pop_front();
}
self.items.push_back(env);
}
fn snapshot(&self) -> Vec<EventEnvelope> {
self.items.iter().cloned().collect()
}
}
impl Journal {
#[allow(clippy::needless_pass_by_value)] pub fn open(config: JournalConfig) -> std::io::Result<Self> {
let (mpsc_tx, mpsc_rx) = mpsc::unbounded_channel::<EventEnvelope>();
let (bcast_tx, _) = broadcast::channel::<EventEnvelope>(config.broadcast_capacity);
let tail = Arc::new(Mutex::new(TailBuffer::new(config.tail_capacity)));
let mut session_path = None;
let journal_dir = if config.disk {
let dir = match config.journal_dir.clone() {
Some(d) => d,
None => default_journal_dir()?,
};
std::fs::create_dir_all(&dir)?;
Some(dir)
} else {
None
};
let retention_stats = if let Some(dir) = &journal_dir {
retention::prune(dir, config.retention_days, config.retention_count).unwrap_or_default()
} else {
RetentionStats::default()
};
if config.disk {
let dir = journal_dir.expect("journal_dir resolved when disk=true");
let pid = std::process::id();
let stamp = iso_timestamp();
let path = dir.join(format!("{stamp}-{pid}.jsonl"));
session_path = Some(path.clone());
tokio::spawn(writer::run(
path,
mpsc_rx,
bcast_tx.clone(),
Arc::clone(&tail),
));
} else {
let bcast_for_task = bcast_tx.clone();
let tail_for_task = Arc::clone(&tail);
tokio::spawn(writer::run_in_memory(
mpsc_rx,
bcast_for_task,
tail_for_task,
));
}
let journal = Self {
sender: mpsc_tx,
broadcaster: bcast_tx,
tail,
session_path,
};
let _ = journal.append(EngineEvent::JournalRetentionApplied {
deleted: retention_stats.deleted,
});
Ok(journal)
}
pub fn append(&self, event: EngineEvent) -> Result<(), JournalError> {
let env = EventEnvelope::new(event);
self.sender
.send(env)
.map_err(|_| JournalError::WriterGone)?;
Ok(())
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<EventEnvelope> {
self.broadcaster.subscribe()
}
#[must_use]
pub fn tail(&self) -> Vec<EventEnvelope> {
self.tail.lock().unwrap().snapshot()
}
}
#[derive(Debug, thiserror::Error)]
pub enum JournalError {
#[error("journal writer task has terminated")]
WriterGone,
}
fn default_journal_dir() -> std::io::Result<PathBuf> {
use directories::ProjectDirs;
let pd = ProjectDirs::from("", "", "vortix")
.ok_or_else(|| std::io::Error::other("could not resolve XDG data dir"))?;
Ok(pd.data_dir().join("sessions"))
}
fn iso_timestamp() -> String {
use time::format_description::well_known::Iso8601;
let now = time::OffsetDateTime::now_utc();
now.format(&Iso8601::DEFAULT)
.unwrap_or_else(|_| "unknown".to_string())
.replace(':', "")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::vortix_core::engine::event::EngineEvent;
use crate::vortix_core::profile::{ProfileId, ProtocolKind};
fn sample_event() -> EngineEvent {
EngineEvent::TunnelUp {
profile_id: ProfileId::new("corp"),
protocol: ProtocolKind::WireGuard,
interface_name: "wg0".into(),
pid: None,
}
}
#[tokio::test]
async fn disk_mode_writes_jsonl() {
let tmp = tempfile::tempdir().unwrap();
let journal = Journal::open(JournalConfig {
disk: true,
journal_dir: Some(tmp.path().to_path_buf()),
..Default::default()
})
.unwrap();
for _ in 0..5 {
journal.append(sample_event()).unwrap();
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let path = journal.session_path.clone().expect("session path");
let content = std::fs::read_to_string(&path).unwrap();
let lines: Vec<&str> = content.lines().collect();
assert_eq!(lines.len(), 6);
for line in &lines {
let _: EventEnvelope = serde_json::from_str(line).expect("each line is valid JSON");
}
}
#[tokio::test]
async fn disk_disabled_mode_writes_nothing() {
let tmp = tempfile::tempdir().unwrap();
let journal = Journal::open(JournalConfig {
disk: false,
journal_dir: Some(tmp.path().to_path_buf()),
..Default::default()
})
.unwrap();
journal.append(sample_event()).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert!(journal.session_path.is_none());
assert!(
std::fs::read_dir(tmp.path()).unwrap().next().is_none(),
"no files should have been written"
);
let tail = journal.tail();
assert!(!tail.is_empty());
}
#[tokio::test]
async fn subscribe_receives_events() {
let tmp = tempfile::tempdir().unwrap();
let journal = Journal::open(JournalConfig {
disk: false,
journal_dir: Some(tmp.path().to_path_buf()),
..Default::default()
})
.unwrap();
let mut rx = journal.subscribe();
journal.append(sample_event()).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut saw_tunnel_up = false;
while let Ok(env) = rx.try_recv() {
if matches!(env.event, EngineEvent::TunnelUp { .. }) {
saw_tunnel_up = true;
}
}
assert!(
saw_tunnel_up,
"subscriber should have received the TunnelUp event"
);
}
}