use std::fs::OpenOptions;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use serde::Serialize;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
pub const GATEWAY_REQUEST_LOG_FILENAME: &str = "gateway_request.log";
pub const GATEWAY_OPERATION_LOG_FILENAME: &str = "gateway_operation.log";
pub const API_KEY_AUTH_LOG_FILENAME: &str = "api_key_auth.log";
const CHANNEL_CAP: usize = 50_000;
const BATCH_FLUSH_INTERVAL: Duration = Duration::from_millis(100);
const BATCH_MAX_LINES: usize = 256;
#[derive(Clone, Copy, Debug)]
enum LineKind {
Request,
Operation,
ApiKey,
}
#[derive(Clone, Debug)]
struct BufferedLine {
kind: LineKind,
payload: Arc<[u8]>,
}
#[derive(Clone)]
pub struct LinuxGatewayFileLog {
tx: mpsc::Sender<BufferedLine>,
}
impl LinuxGatewayFileLog {
pub fn spawn(log_dir: PathBuf) -> Self {
let (tx, rx) = mpsc::channel(CHANNEL_CAP);
tokio::spawn(run_linux_file_log_worker(rx, log_dir));
Self { tx }
}
pub fn try_init<P>(enabled: bool, log_dir: P) -> Option<Arc<Self>>
where
P: Into<PathBuf>,
{
if !cfg!(target_os = "linux") || !enabled {
return None;
}
let dir: PathBuf = log_dir.into();
match probe_log_dir(&dir) {
Ok(()) => Some(Arc::new(Self::spawn(dir))),
Err(err) => {
tracing::warn!(
target: "athena_rs::linux_gateway_file_log",
path = %dir.display(),
error = %err,
"gateway Linux file logging disabled: directory not usable"
);
None
}
}
}
fn try_send_line(&self, kind: LineKind, payload: Arc<[u8]>) -> Result<(), BufferedLine> {
self.tx
.try_send(BufferedLine { kind, payload })
.map_err(|err| match err {
TrySendError::Full(line) | TrySendError::Closed(line) => line,
})
}
pub fn try_enqueue_request<T: Serialize>(&self, entry: &T) -> Result<(), ()> {
match serde_json::to_vec(entry) {
Ok(mut vec) => {
vec.push(b'\n');
match self.try_send_line(LineKind::Request, vec.into()) {
Ok(()) => Ok(()),
Err(line) => {
tracing::warn!(
target: "athena_rs::linux_gateway_file_log",
"gateway request log file channel full; dropping line"
);
drop(line);
Err(())
}
}
}
Err(err) => {
tracing::warn!(
target: "athena_rs::linux_gateway_file_log",
error = %err,
"failed to serialize gateway request log line"
);
Err(())
}
}
}
pub fn try_enqueue_operation<T: Serialize>(&self, entry: &T) -> Result<(), ()> {
match serde_json::to_vec(entry) {
Ok(mut vec) => {
vec.push(b'\n');
match self.try_send_line(LineKind::Operation, vec.into()) {
Ok(()) => Ok(()),
Err(line) => {
tracing::warn!(
target: "athena_rs::linux_gateway_file_log",
"gateway operation log file channel full; dropping line"
);
drop(line);
Err(())
}
}
}
Err(err) => {
tracing::warn!(
target: "athena_rs::linux_gateway_file_log",
error = %err,
"failed to serialize gateway operation log line"
);
Err(())
}
}
}
pub fn try_enqueue_api_key_auth<T: Serialize>(&self, entry: &T) -> Result<(), ()> {
match serde_json::to_vec(entry) {
Ok(mut vec) => {
vec.push(b'\n');
match self.try_send_line(LineKind::ApiKey, vec.into()) {
Ok(()) => Ok(()),
Err(line) => {
tracing::warn!(
target: "athena_rs::linux_gateway_file_log",
"api_key_auth log file channel full; dropping line"
);
drop(line);
Err(())
}
}
}
Err(err) => {
tracing::warn!(
target: "athena_rs::linux_gateway_file_log",
error = %err,
"failed to serialize api_key_auth log line"
);
Err(())
}
}
}
}
fn probe_log_dir(dir: &Path) -> std::io::Result<()> {
std::fs::create_dir_all(dir)?;
let probe_path = dir.join(".athena_write_probe");
{
let mut probe = OpenOptions::new()
.create(true)
.append(true)
.open(&probe_path)?;
writeln!(probe)?;
}
let _ = std::fs::remove_file(probe_path);
Ok(())
}
async fn run_linux_file_log_worker(mut rx: mpsc::Receiver<BufferedLine>, log_dir: PathBuf) {
let request_path = log_dir.join(GATEWAY_REQUEST_LOG_FILENAME);
let operation_path = log_dir.join(GATEWAY_OPERATION_LOG_FILENAME);
let api_key_path = log_dir.join(API_KEY_AUTH_LOG_FILENAME);
let mut tick = tokio::time::interval(BATCH_FLUSH_INTERVAL);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut batch: Vec<BufferedLine> = Vec::with_capacity(BATCH_MAX_LINES);
loop {
tokio::select! {
biased;
msg = rx.recv() => {
match msg {
Some(line) => {
batch.push(line);
if batch.len() >= BATCH_MAX_LINES {
flush_batch_to_disk(&request_path, &operation_path, &api_key_path, &mut batch).await;
}
}
None => {
flush_batch_to_disk(&request_path, &operation_path, &api_key_path, &mut batch).await;
return;
}
}
}
_ = tick.tick() => {
flush_batch_to_disk(&request_path, &operation_path, &api_key_path, &mut batch).await;
}
}
}
}
async fn flush_batch_to_disk(
request_path: &Path,
operation_path: &Path,
api_key_path: &Path,
batch: &mut Vec<BufferedLine>,
) {
if batch.is_empty() {
return;
}
let drained: Vec<BufferedLine> = std::mem::take(batch);
let req_path = request_path.to_path_buf();
let op_path = operation_path.to_path_buf();
let key_path = api_key_path.to_path_buf();
let result = tokio::task::spawn_blocking(move || {
let mut request_buf: Vec<u8> = Vec::new();
let mut operation_buf: Vec<u8> = Vec::new();
let mut api_key_buf: Vec<u8> = Vec::new();
for line in drained {
match line.kind {
LineKind::Request => request_buf.extend_from_slice(&line.payload),
LineKind::Operation => operation_buf.extend_from_slice(&line.payload),
LineKind::ApiKey => api_key_buf.extend_from_slice(&line.payload),
}
}
if !request_buf.is_empty() {
append_file(&req_path, &request_buf)?;
}
if !operation_buf.is_empty() {
append_file(&op_path, &operation_buf)?;
}
if !api_key_buf.is_empty() {
append_file(&key_path, &api_key_buf)?;
}
Ok::<(), std::io::Error>(())
})
.await;
match result {
Ok(Ok(())) => {}
Ok(Err(err)) => {
tracing::warn!(
target: "athena_rs::linux_gateway_file_log",
error = %err,
"failed to append gateway Linux file logs"
);
}
Err(err) => {
tracing::warn!(
target: "athena_rs::linux_gateway_file_log",
error = %err,
"Linux file log flush task failed"
);
}
}
}
fn append_file(path: &Path, bytes: &[u8]) -> std::io::Result<()> {
let mut file = OpenOptions::new().create(true).append(true).open(path)?;
file.write_all(bytes)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
fn scratch_dir(name: &str) -> PathBuf {
std::env::temp_dir().join(format!(
"athena_linux_flog_{}_{}_{}",
name,
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
))
}
#[test]
#[cfg(unix)]
fn probe_rejects_non_writable_parent() {
let tmp = scratch_dir("probe");
let nested = tmp.join("nested");
fs::create_dir_all(&nested).expect("mkdir");
let mut perms = fs::metadata(&nested).expect("meta").permissions();
perms.set_readonly(true);
fs::set_permissions(&nested, perms).expect("chmod");
let dead = nested.join("impossible");
assert!(probe_log_dir(&dead).is_err());
let _ = fs::remove_dir_all(&tmp);
}
#[tokio::test]
async fn round_trip_ndjson_line() {
let tmp = scratch_dir("rt");
fs::create_dir_all(&tmp).expect("mkdir");
probe_log_dir(&tmp).expect("probe");
let sink = LinuxGatewayFileLog::spawn(tmp.clone());
#[derive(Serialize)]
struct Row {
k: i32,
}
sink.try_enqueue_request(&Row { k: 1 }).expect("enqueue");
tokio::time::sleep(Duration::from_millis(300)).await;
let path = tmp.join(GATEWAY_REQUEST_LOG_FILENAME);
let text = fs::read_to_string(&path).expect("read");
assert!(text.contains("\"k\":1"));
let _ = fs::remove_dir_all(&tmp);
}
}