athena_observability/
linux_gateway_file_log.rs1use std::fs::OpenOptions;
4use std::io::Write;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::time::Duration;
8
9use serde::Serialize;
10use tokio::sync::mpsc;
11use tokio::sync::mpsc::error::TrySendError;
12
13pub const GATEWAY_REQUEST_LOG_FILENAME: &str = "gateway_request.log";
14pub const GATEWAY_OPERATION_LOG_FILENAME: &str = "gateway_operation.log";
15pub const API_KEY_AUTH_LOG_FILENAME: &str = "api_key_auth.log";
16
17const CHANNEL_CAP: usize = 50_000;
18const BATCH_FLUSH_INTERVAL: Duration = Duration::from_millis(100);
19const BATCH_MAX_LINES: usize = 256;
20
21#[derive(Clone, Copy, Debug)]
22enum LineKind {
23 Request,
24 Operation,
25 ApiKey,
26}
27
28#[derive(Clone, Debug)]
29struct BufferedLine {
30 kind: LineKind,
31 payload: Arc<[u8]>,
32}
33
34#[derive(Clone)]
35pub struct LinuxGatewayFileLog {
36 tx: mpsc::Sender<BufferedLine>,
37}
38
39impl LinuxGatewayFileLog {
40 pub fn spawn(log_dir: PathBuf) -> Self {
42 let (tx, rx) = mpsc::channel(CHANNEL_CAP);
43 tokio::spawn(run_linux_file_log_worker(rx, log_dir));
44 Self { tx }
45 }
46
47 pub fn try_init<P>(enabled: bool, log_dir: P) -> Option<Arc<Self>>
49 where
50 P: Into<PathBuf>,
51 {
52 if !cfg!(target_os = "linux") || !enabled {
53 return None;
54 }
55
56 let dir: PathBuf = log_dir.into();
57 match probe_log_dir(&dir) {
58 Ok(()) => Some(Arc::new(Self::spawn(dir))),
59 Err(err) => {
60 tracing::warn!(
61 target: "athena_rs::linux_gateway_file_log",
62 path = %dir.display(),
63 error = %err,
64 "gateway Linux file logging disabled: directory not usable"
65 );
66 None
67 }
68 }
69 }
70
71 fn try_send_line(&self, kind: LineKind, payload: Arc<[u8]>) -> Result<(), BufferedLine> {
72 self.tx
73 .try_send(BufferedLine { kind, payload })
74 .map_err(|err| match err {
75 TrySendError::Full(line) | TrySendError::Closed(line) => line,
76 })
77 }
78
79 pub fn try_enqueue_request<T: Serialize>(&self, entry: &T) -> Result<(), ()> {
81 match serde_json::to_vec(entry) {
82 Ok(mut vec) => {
83 vec.push(b'\n');
84 match self.try_send_line(LineKind::Request, vec.into()) {
85 Ok(()) => Ok(()),
86 Err(line) => {
87 tracing::warn!(
88 target: "athena_rs::linux_gateway_file_log",
89 "gateway request log file channel full; dropping line"
90 );
91 drop(line);
92 Err(())
93 }
94 }
95 }
96 Err(err) => {
97 tracing::warn!(
98 target: "athena_rs::linux_gateway_file_log",
99 error = %err,
100 "failed to serialize gateway request log line"
101 );
102 Err(())
103 }
104 }
105 }
106
107 pub fn try_enqueue_operation<T: Serialize>(&self, entry: &T) -> Result<(), ()> {
109 match serde_json::to_vec(entry) {
110 Ok(mut vec) => {
111 vec.push(b'\n');
112 match self.try_send_line(LineKind::Operation, vec.into()) {
113 Ok(()) => Ok(()),
114 Err(line) => {
115 tracing::warn!(
116 target: "athena_rs::linux_gateway_file_log",
117 "gateway operation log file channel full; dropping line"
118 );
119 drop(line);
120 Err(())
121 }
122 }
123 }
124 Err(err) => {
125 tracing::warn!(
126 target: "athena_rs::linux_gateway_file_log",
127 error = %err,
128 "failed to serialize gateway operation log line"
129 );
130 Err(())
131 }
132 }
133 }
134
135 pub fn try_enqueue_api_key_auth<T: Serialize>(&self, entry: &T) -> Result<(), ()> {
137 match serde_json::to_vec(entry) {
138 Ok(mut vec) => {
139 vec.push(b'\n');
140 match self.try_send_line(LineKind::ApiKey, vec.into()) {
141 Ok(()) => Ok(()),
142 Err(line) => {
143 tracing::warn!(
144 target: "athena_rs::linux_gateway_file_log",
145 "api_key_auth log file channel full; dropping line"
146 );
147 drop(line);
148 Err(())
149 }
150 }
151 }
152 Err(err) => {
153 tracing::warn!(
154 target: "athena_rs::linux_gateway_file_log",
155 error = %err,
156 "failed to serialize api_key_auth log line"
157 );
158 Err(())
159 }
160 }
161 }
162}
163
164fn probe_log_dir(dir: &Path) -> std::io::Result<()> {
165 std::fs::create_dir_all(dir)?;
166 let probe_path = dir.join(".athena_write_probe");
167 {
168 let mut probe = OpenOptions::new()
169 .create(true)
170 .append(true)
171 .open(&probe_path)?;
172 writeln!(probe)?;
173 }
174 let _ = std::fs::remove_file(probe_path);
175 Ok(())
176}
177
178async fn run_linux_file_log_worker(mut rx: mpsc::Receiver<BufferedLine>, log_dir: PathBuf) {
179 let request_path = log_dir.join(GATEWAY_REQUEST_LOG_FILENAME);
180 let operation_path = log_dir.join(GATEWAY_OPERATION_LOG_FILENAME);
181 let api_key_path = log_dir.join(API_KEY_AUTH_LOG_FILENAME);
182
183 let mut tick = tokio::time::interval(BATCH_FLUSH_INTERVAL);
184 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
185 let mut batch: Vec<BufferedLine> = Vec::with_capacity(BATCH_MAX_LINES);
186
187 loop {
188 tokio::select! {
189 biased;
190 msg = rx.recv() => {
191 match msg {
192 Some(line) => {
193 batch.push(line);
194 if batch.len() >= BATCH_MAX_LINES {
195 flush_batch_to_disk(&request_path, &operation_path, &api_key_path, &mut batch).await;
196 }
197 }
198 None => {
199 flush_batch_to_disk(&request_path, &operation_path, &api_key_path, &mut batch).await;
200 return;
201 }
202 }
203 }
204 _ = tick.tick() => {
205 flush_batch_to_disk(&request_path, &operation_path, &api_key_path, &mut batch).await;
206 }
207 }
208 }
209}
210
211async fn flush_batch_to_disk(
212 request_path: &Path,
213 operation_path: &Path,
214 api_key_path: &Path,
215 batch: &mut Vec<BufferedLine>,
216) {
217 if batch.is_empty() {
218 return;
219 }
220
221 let drained: Vec<BufferedLine> = std::mem::take(batch);
222 let req_path = request_path.to_path_buf();
223 let op_path = operation_path.to_path_buf();
224 let key_path = api_key_path.to_path_buf();
225
226 let result = tokio::task::spawn_blocking(move || {
227 let mut request_buf: Vec<u8> = Vec::new();
228 let mut operation_buf: Vec<u8> = Vec::new();
229 let mut api_key_buf: Vec<u8> = Vec::new();
230
231 for line in drained {
232 match line.kind {
233 LineKind::Request => request_buf.extend_from_slice(&line.payload),
234 LineKind::Operation => operation_buf.extend_from_slice(&line.payload),
235 LineKind::ApiKey => api_key_buf.extend_from_slice(&line.payload),
236 }
237 }
238
239 if !request_buf.is_empty() {
240 append_file(&req_path, &request_buf)?;
241 }
242 if !operation_buf.is_empty() {
243 append_file(&op_path, &operation_buf)?;
244 }
245 if !api_key_buf.is_empty() {
246 append_file(&key_path, &api_key_buf)?;
247 }
248 Ok::<(), std::io::Error>(())
249 })
250 .await;
251
252 match result {
253 Ok(Ok(())) => {}
254 Ok(Err(err)) => {
255 tracing::warn!(
256 target: "athena_rs::linux_gateway_file_log",
257 error = %err,
258 "failed to append gateway Linux file logs"
259 );
260 }
261 Err(err) => {
262 tracing::warn!(
263 target: "athena_rs::linux_gateway_file_log",
264 error = %err,
265 "Linux file log flush task failed"
266 );
267 }
268 }
269}
270
271fn append_file(path: &Path, bytes: &[u8]) -> std::io::Result<()> {
272 let mut file = OpenOptions::new().create(true).append(true).open(path)?;
273 file.write_all(bytes)?;
274 Ok(())
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 use std::fs;
281
282 fn scratch_dir(name: &str) -> PathBuf {
283 std::env::temp_dir().join(format!(
284 "athena_linux_flog_{}_{}_{}",
285 name,
286 std::process::id(),
287 std::time::SystemTime::now()
288 .duration_since(std::time::UNIX_EPOCH)
289 .map(|d| d.as_nanos())
290 .unwrap_or(0)
291 ))
292 }
293
294 #[test]
295 #[cfg(unix)]
296 fn probe_rejects_non_writable_parent() {
297 let tmp = scratch_dir("probe");
298 let nested = tmp.join("nested");
299 fs::create_dir_all(&nested).expect("mkdir");
300 let mut perms = fs::metadata(&nested).expect("meta").permissions();
301 perms.set_readonly(true);
302 fs::set_permissions(&nested, perms).expect("chmod");
303
304 let dead = nested.join("impossible");
305 assert!(probe_log_dir(&dead).is_err());
306 let _ = fs::remove_dir_all(&tmp);
307 }
308
309 #[tokio::test]
310 async fn round_trip_ndjson_line() {
311 let tmp = scratch_dir("rt");
312 fs::create_dir_all(&tmp).expect("mkdir");
313 probe_log_dir(&tmp).expect("probe");
314
315 let sink = LinuxGatewayFileLog::spawn(tmp.clone());
316 #[derive(Serialize)]
317 struct Row {
318 k: i32,
319 }
320 sink.try_enqueue_request(&Row { k: 1 }).expect("enqueue");
321
322 tokio::time::sleep(Duration::from_millis(300)).await;
323
324 let path = tmp.join(GATEWAY_REQUEST_LOG_FILENAME);
325 let text = fs::read_to_string(&path).expect("read");
326 assert!(text.contains("\"k\":1"));
327 let _ = fs::remove_dir_all(&tmp);
328 }
329}