Skip to main content

athena_observability/
linux_gateway_file_log.rs

1//! Optional Linux file sink for gateway request/operation logs and `api_key_auth_log` (NDJSON).
2
3use std::fs::OpenOptions;
4use std::io::Write;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::time::Duration;
8
9use serde::Serialize;
10use tokio::sync::mpsc;
11use tokio::sync::mpsc::error::TrySendError;
12
13pub const GATEWAY_REQUEST_LOG_FILENAME: &str = "gateway_request.log";
14pub const GATEWAY_OPERATION_LOG_FILENAME: &str = "gateway_operation.log";
15pub const API_KEY_AUTH_LOG_FILENAME: &str = "api_key_auth.log";
16
17const CHANNEL_CAP: usize = 50_000;
18const BATCH_FLUSH_INTERVAL: Duration = Duration::from_millis(100);
19const BATCH_MAX_LINES: usize = 256;
20
21#[derive(Clone, Copy, Debug)]
22enum LineKind {
23    Request,
24    Operation,
25    ApiKey,
26}
27
28#[derive(Clone, Debug)]
29struct BufferedLine {
30    kind: LineKind,
31    payload: Arc<[u8]>,
32}
33
34#[derive(Clone)]
35pub struct LinuxGatewayFileLog {
36    tx: mpsc::Sender<BufferedLine>,
37}
38
39impl LinuxGatewayFileLog {
40    /// Spawns the async file sink against `log_dir`.
41    pub fn spawn(log_dir: PathBuf) -> Self {
42        let (tx, rx) = mpsc::channel(CHANNEL_CAP);
43        tokio::spawn(run_linux_file_log_worker(rx, log_dir));
44        Self { tx }
45    }
46
47    /// Builds the sink only when Linux file logging is enabled and usable.
48    pub fn try_init<P>(enabled: bool, log_dir: P) -> Option<Arc<Self>>
49    where
50        P: Into<PathBuf>,
51    {
52        if !cfg!(target_os = "linux") || !enabled {
53            return None;
54        }
55
56        let dir: PathBuf = log_dir.into();
57        match probe_log_dir(&dir) {
58            Ok(()) => Some(Arc::new(Self::spawn(dir))),
59            Err(err) => {
60                tracing::warn!(
61                    target: "athena_rs::linux_gateway_file_log",
62                    path = %dir.display(),
63                    error = %err,
64                    "gateway Linux file logging disabled: directory not usable"
65                );
66                None
67            }
68        }
69    }
70
71    fn try_send_line(&self, kind: LineKind, payload: Arc<[u8]>) -> Result<(), BufferedLine> {
72        self.tx
73            .try_send(BufferedLine { kind, payload })
74            .map_err(|err| match err {
75                TrySendError::Full(line) | TrySendError::Closed(line) => line,
76            })
77    }
78
79    /// Enqueues one gateway request row.
80    pub fn try_enqueue_request<T: Serialize>(&self, entry: &T) -> Result<(), ()> {
81        match serde_json::to_vec(entry) {
82            Ok(mut vec) => {
83                vec.push(b'\n');
84                match self.try_send_line(LineKind::Request, vec.into()) {
85                    Ok(()) => Ok(()),
86                    Err(line) => {
87                        tracing::warn!(
88                            target: "athena_rs::linux_gateway_file_log",
89                            "gateway request log file channel full; dropping line"
90                        );
91                        drop(line);
92                        Err(())
93                    }
94                }
95            }
96            Err(err) => {
97                tracing::warn!(
98                    target: "athena_rs::linux_gateway_file_log",
99                    error = %err,
100                    "failed to serialize gateway request log line"
101                );
102                Err(())
103            }
104        }
105    }
106
107    /// Enqueues one gateway operation row.
108    pub fn try_enqueue_operation<T: Serialize>(&self, entry: &T) -> Result<(), ()> {
109        match serde_json::to_vec(entry) {
110            Ok(mut vec) => {
111                vec.push(b'\n');
112                match self.try_send_line(LineKind::Operation, vec.into()) {
113                    Ok(()) => Ok(()),
114                    Err(line) => {
115                        tracing::warn!(
116                            target: "athena_rs::linux_gateway_file_log",
117                            "gateway operation log file channel full; dropping line"
118                        );
119                        drop(line);
120                        Err(())
121                    }
122                }
123            }
124            Err(err) => {
125                tracing::warn!(
126                    target: "athena_rs::linux_gateway_file_log",
127                    error = %err,
128                    "failed to serialize gateway operation log line"
129                );
130                Err(())
131            }
132        }
133    }
134
135    /// Enqueues one api-key auth row.
136    pub fn try_enqueue_api_key_auth<T: Serialize>(&self, entry: &T) -> Result<(), ()> {
137        match serde_json::to_vec(entry) {
138            Ok(mut vec) => {
139                vec.push(b'\n');
140                match self.try_send_line(LineKind::ApiKey, vec.into()) {
141                    Ok(()) => Ok(()),
142                    Err(line) => {
143                        tracing::warn!(
144                            target: "athena_rs::linux_gateway_file_log",
145                            "api_key_auth log file channel full; dropping line"
146                        );
147                        drop(line);
148                        Err(())
149                    }
150                }
151            }
152            Err(err) => {
153                tracing::warn!(
154                    target: "athena_rs::linux_gateway_file_log",
155                    error = %err,
156                    "failed to serialize api_key_auth log line"
157                );
158                Err(())
159            }
160        }
161    }
162}
163
164fn probe_log_dir(dir: &Path) -> std::io::Result<()> {
165    std::fs::create_dir_all(dir)?;
166    let probe_path = dir.join(".athena_write_probe");
167    {
168        let mut probe = OpenOptions::new()
169            .create(true)
170            .append(true)
171            .open(&probe_path)?;
172        writeln!(probe)?;
173    }
174    let _ = std::fs::remove_file(probe_path);
175    Ok(())
176}
177
178async fn run_linux_file_log_worker(mut rx: mpsc::Receiver<BufferedLine>, log_dir: PathBuf) {
179    let request_path = log_dir.join(GATEWAY_REQUEST_LOG_FILENAME);
180    let operation_path = log_dir.join(GATEWAY_OPERATION_LOG_FILENAME);
181    let api_key_path = log_dir.join(API_KEY_AUTH_LOG_FILENAME);
182
183    let mut tick = tokio::time::interval(BATCH_FLUSH_INTERVAL);
184    tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
185    let mut batch: Vec<BufferedLine> = Vec::with_capacity(BATCH_MAX_LINES);
186
187    loop {
188        tokio::select! {
189            biased;
190            msg = rx.recv() => {
191                match msg {
192                    Some(line) => {
193                        batch.push(line);
194                        if batch.len() >= BATCH_MAX_LINES {
195                            flush_batch_to_disk(&request_path, &operation_path, &api_key_path, &mut batch).await;
196                        }
197                    }
198                    None => {
199                        flush_batch_to_disk(&request_path, &operation_path, &api_key_path, &mut batch).await;
200                        return;
201                    }
202                }
203            }
204            _ = tick.tick() => {
205                flush_batch_to_disk(&request_path, &operation_path, &api_key_path, &mut batch).await;
206            }
207        }
208    }
209}
210
211async fn flush_batch_to_disk(
212    request_path: &Path,
213    operation_path: &Path,
214    api_key_path: &Path,
215    batch: &mut Vec<BufferedLine>,
216) {
217    if batch.is_empty() {
218        return;
219    }
220
221    let drained: Vec<BufferedLine> = std::mem::take(batch);
222    let req_path = request_path.to_path_buf();
223    let op_path = operation_path.to_path_buf();
224    let key_path = api_key_path.to_path_buf();
225
226    let result = tokio::task::spawn_blocking(move || {
227        let mut request_buf: Vec<u8> = Vec::new();
228        let mut operation_buf: Vec<u8> = Vec::new();
229        let mut api_key_buf: Vec<u8> = Vec::new();
230
231        for line in drained {
232            match line.kind {
233                LineKind::Request => request_buf.extend_from_slice(&line.payload),
234                LineKind::Operation => operation_buf.extend_from_slice(&line.payload),
235                LineKind::ApiKey => api_key_buf.extend_from_slice(&line.payload),
236            }
237        }
238
239        if !request_buf.is_empty() {
240            append_file(&req_path, &request_buf)?;
241        }
242        if !operation_buf.is_empty() {
243            append_file(&op_path, &operation_buf)?;
244        }
245        if !api_key_buf.is_empty() {
246            append_file(&key_path, &api_key_buf)?;
247        }
248        Ok::<(), std::io::Error>(())
249    })
250    .await;
251
252    match result {
253        Ok(Ok(())) => {}
254        Ok(Err(err)) => {
255            tracing::warn!(
256                target: "athena_rs::linux_gateway_file_log",
257                error = %err,
258                "failed to append gateway Linux file logs"
259            );
260        }
261        Err(err) => {
262            tracing::warn!(
263                target: "athena_rs::linux_gateway_file_log",
264                error = %err,
265                "Linux file log flush task failed"
266            );
267        }
268    }
269}
270
271fn append_file(path: &Path, bytes: &[u8]) -> std::io::Result<()> {
272    let mut file = OpenOptions::new().create(true).append(true).open(path)?;
273    file.write_all(bytes)?;
274    Ok(())
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use std::fs;
281
282    fn scratch_dir(name: &str) -> PathBuf {
283        std::env::temp_dir().join(format!(
284            "athena_linux_flog_{}_{}_{}",
285            name,
286            std::process::id(),
287            std::time::SystemTime::now()
288                .duration_since(std::time::UNIX_EPOCH)
289                .map(|d| d.as_nanos())
290                .unwrap_or(0)
291        ))
292    }
293
294    #[test]
295    #[cfg(unix)]
296    fn probe_rejects_non_writable_parent() {
297        let tmp = scratch_dir("probe");
298        let nested = tmp.join("nested");
299        fs::create_dir_all(&nested).expect("mkdir");
300        let mut perms = fs::metadata(&nested).expect("meta").permissions();
301        perms.set_readonly(true);
302        fs::set_permissions(&nested, perms).expect("chmod");
303
304        let dead = nested.join("impossible");
305        assert!(probe_log_dir(&dead).is_err());
306        let _ = fs::remove_dir_all(&tmp);
307    }
308
309    #[tokio::test]
310    async fn round_trip_ndjson_line() {
311        let tmp = scratch_dir("rt");
312        fs::create_dir_all(&tmp).expect("mkdir");
313        probe_log_dir(&tmp).expect("probe");
314
315        let sink = LinuxGatewayFileLog::spawn(tmp.clone());
316        #[derive(Serialize)]
317        struct Row {
318            k: i32,
319        }
320        sink.try_enqueue_request(&Row { k: 1 }).expect("enqueue");
321
322        tokio::time::sleep(Duration::from_millis(300)).await;
323
324        let path = tmp.join(GATEWAY_REQUEST_LOG_FILENAME);
325        let text = fs::read_to_string(&path).expect("read");
326        assert!(text.contains("\"k\":1"));
327        let _ = fs::remove_dir_all(&tmp);
328    }
329}