use crate::error::{Result, RouterError};
use crate::router::RoutedMessage;
use std::path::Path;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::fs::{self, File};
use tokio::io::AsyncWriteExt;
use tokio::sync::broadcast::{self, error::RecvError};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
pub async fn run(
logs_dir: String,
mut bus_rx: broadcast::Receiver<RoutedMessage>,
cancel_token: CancellationToken,
) -> Result<()> {
let dir = Path::new(&logs_dir);
if !dir.exists() {
fs::create_dir_all(dir)
.await
.map_err(|e| RouterError::filesystem(&logs_dir, e))?;
}
let now = SystemTime::now();
let since_the_epoch_us = now
.duration_since(UNIX_EPOCH)
.unwrap_or(std::time::Duration::from_secs(0))
.as_micros() as u64;
let filename = format!("flight_{}.tlog", since_the_epoch_us);
let path = dir.join(filename);
info!("TLog Logger: Logging to {:?}", path);
let path_str = path.display().to_string();
let file = File::create(&path)
.await
.map_err(|e| RouterError::filesystem(&path_str, e))?;
let mut writer = tokio::io::BufWriter::new(file);
let mut flush_interval = tokio::time::interval(FLUSH_INTERVAL);
flush_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
info!("TLog Logger: Shutdown signal received.");
break;
}
_ = flush_interval.tick() => {
if let Err(e) = writer.flush().await {
error!("TLog periodic flush error: {}", e);
return Err(RouterError::filesystem(&path_str, e));
}
}
res = bus_rx.recv() => {
match res {
Ok(msg) => {
let timestamp_us = msg.timestamp_us;
let ts_bytes = timestamp_us.to_be_bytes();
if let Err(e) = writer.write_all(&ts_bytes).await {
error!("TLog write error: {}", e);
return Err(RouterError::filesystem(&path_str, e));
}
if let Err(e) = writer.write_all(&msg.serialized_bytes).await {
error!("TLog write error: {}", e);
return Err(RouterError::filesystem(&path_str, e));
}
}
Err(RecvError::Lagged(n)) => {
warn!("TLog Logger lagged: missed {} messages", n);
}
Err(RecvError::Closed) => break,
}
}
}
}
if let Err(e) = writer.flush().await {
error!("TLog flush error: {}", e);
}
info!("TLog Logger finished flushing and closing.");
Ok(())
}