use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tokio::io::AsyncWriteExt;
use tokio::sync::{broadcast, mpsc};
use tracing::warn;
use crate::vortix_core::engine::event::EventEnvelope;
use super::TailBuffer;
pub(crate) async fn run(
path: PathBuf,
mut mpsc_rx: mpsc::UnboundedReceiver<EventEnvelope>,
bcast_tx: broadcast::Sender<EventEnvelope>,
tail: Arc<Mutex<TailBuffer>>,
) {
let mut file = match tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await
{
Ok(f) => f,
Err(e) => {
warn!(
target: "vortix::journal",
path = %path.display(),
error = %e,
"failed to open journal file; events will be dropped"
);
while mpsc_rx.recv().await.is_some() {}
return;
}
};
while let Some(env) = mpsc_rx.recv().await {
match serde_json::to_vec(&env) {
Ok(mut bytes) => {
bytes.push(b'\n');
if let Err(e) = file.write_all(&bytes).await {
warn!(
target: "vortix::journal",
path = %path.display(),
error = %e,
"journal write failed"
);
} else if let Err(e) = file.flush().await {
warn!(target: "vortix::journal", error = %e, "journal flush failed");
}
}
Err(e) => {
warn!(
target: "vortix::journal",
error = %e,
"failed to serialise journal record"
);
}
}
let _ = bcast_tx.send(env.clone());
tail.lock().unwrap().push(env);
}
}
pub(crate) async fn run_in_memory(
mut mpsc_rx: mpsc::UnboundedReceiver<EventEnvelope>,
bcast_tx: broadcast::Sender<EventEnvelope>,
tail: Arc<Mutex<TailBuffer>>,
) {
while let Some(env) = mpsc_rx.recv().await {
let _ = bcast_tx.send(env.clone());
tail.lock().unwrap().push(env);
}
}