pingap_logger/
async_logger.rs1use super::LOG_TARGET;
16use super::file_appender::new_rolling_file_writer;
17use async_trait::async_trait;
18use bytes::BytesMut;
19use pingap_core::Error;
20use pingora::server::ShutdownWatch;
21use pingora::services::background::BackgroundService;
22use serde::{Deserialize, Serialize};
23use std::io::{BufWriter, Write};
24use std::time::Duration;
25use tokio::sync::Mutex;
26use tokio::sync::mpsc::{Receiver, Sender, channel};
27use tracing::{error, info};
28use tracing_appender::rolling::RollingFileAppender;
29
30type Result<T> = std::result::Result<T, Error>;
31
32pub struct AsyncLoggerTask {
33 dir: String,
34 path: String,
35 channel_buffer: usize,
36 receiver: Mutex<Option<Receiver<BytesMut>>>,
37 writer: Mutex<Option<BufWriter<RollingFileAppender>>>,
38 flush_timeout: Duration,
39}
40impl AsyncLoggerTask {
41 pub fn get_dir(&self) -> String {
42 self.dir.clone()
43 }
44}
45
46#[derive(Debug, PartialEq, Deserialize, Serialize, Default)]
47struct AsyncLoggerWriterParams {
48 channel_buffer: Option<usize>,
49 #[serde(default)]
50 #[serde(with = "humantime_serde")]
51 flush_timeout: Option<Duration>,
52}
53
54pub async fn new_async_logger(
55 path: &str,
56) -> Result<(Sender<BytesMut>, AsyncLoggerTask)> {
57 let original_path = path.to_string();
58 let (path, query) = path.split_once('?').unwrap_or((path, ""));
59 let params: AsyncLoggerWriterParams =
60 serde_qs::from_str(query).unwrap_or_default();
61
62 let rolling_file_writer =
63 new_rolling_file_writer(&original_path).map_err(|e| {
64 Error::Invalid {
65 message: format!("{}: {}", original_path, e),
66 }
67 })?;
68
69 let buffered_writer = BufWriter::new(rolling_file_writer.writer);
70 let channel_buffer = params.channel_buffer.unwrap_or(1000);
71 let flush_timeout = params.flush_timeout.unwrap_or(Duration::from_secs(10));
72
73 let (tx, rx) = channel::<BytesMut>(channel_buffer);
74
75 let task = AsyncLoggerTask {
76 dir: rolling_file_writer.dir,
77 channel_buffer,
78 path: path.to_string(),
79 receiver: Mutex::new(Some(rx)),
80 writer: Mutex::new(Some(buffered_writer)),
81 flush_timeout,
82 };
83
84 Ok((tx, task))
85}
86
87#[async_trait]
88impl BackgroundService for AsyncLoggerTask {
89 async fn start(&self, mut shutdown: ShutdownWatch) {
90 let Some(mut receiver) = self.receiver.lock().await.take() else {
91 return;
92 };
93 let Some(mut writer) = self.writer.lock().await.take() else {
94 return;
95 };
96 info!(
97 target: LOG_TARGET,
98 path = self.path,
99 channel_buffer = self.channel_buffer,
100 flush_timeout = format!("{:?}", self.flush_timeout),
101 "async logger is running",
102 );
103 const MAX_BATCH_SIZE: usize = 128;
104 let mut interval = tokio::time::interval(self.flush_timeout);
105
106 loop {
107 tokio::select! {
108 _ = shutdown.changed() => {
109 break;
110 }
111 Some(msg) = receiver.recv() => {
112 let mut messages = Vec::with_capacity(MAX_BATCH_SIZE);
113 messages.push(msg);
114 while messages.len() < MAX_BATCH_SIZE {
115 match receiver.try_recv() {
116 Ok(msg) => {
117 messages.push(msg);
118 }
119 Err(_) => break,
120 }
121 }
122 for mut msg in messages {
123 msg.extend_from_slice(b"\n");
124 if let Err(e) = writer.write(&msg) {
125 error!(
126 target: LOG_TARGET,
127 error = %e,
128 "write fail",
129 );
130 }
131 }
132 }
133 _ = interval.tick() => {
134 if let Err(e) = writer.flush() {
135 error!(
136 target: LOG_TARGET,
137 error = %e,
138 "flush fail",
139 );
140 }
141 }
142 else => {
143 break;
145 }
146 }
147 }
148 while let Some(mut msg) = receiver.recv().await {
150 msg.extend_from_slice(b"\n");
151 if let Err(e) = writer.write_all(&msg) {
152 error!(
153 target: LOG_TARGET,
154 error = %e,
155 "write fail",
156 );
157 }
158 }
159
160 if let Err(e) = writer.flush() {
162 error!(
163 target: LOG_TARGET,
164 error = %e,
165 "flush fail",
166 );
167 }
168 }
169}