Skip to main content

pingap_logger/
async_logger.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::LOG_TARGET;
16use super::file_appender::new_rolling_file_writer;
17use async_trait::async_trait;
18use bytes::BytesMut;
19use pingap_core::Error;
20use pingora::server::ShutdownWatch;
21use pingora::services::background::BackgroundService;
22use serde::{Deserialize, Serialize};
23use std::io::{BufWriter, Write};
24use std::time::Duration;
25use tokio::sync::Mutex;
26use tokio::sync::mpsc::{Receiver, Sender, channel};
27use tracing::{error, info};
28use tracing_appender::rolling::RollingFileAppender;
29
30type Result<T> = std::result::Result<T, Error>;
31
32pub struct AsyncLoggerTask {
33    dir: String,
34    path: String,
35    channel_buffer: usize,
36    receiver: Mutex<Option<Receiver<BytesMut>>>,
37    writer: Mutex<Option<BufWriter<RollingFileAppender>>>,
38    flush_timeout: Duration,
39}
40impl AsyncLoggerTask {
41    pub fn get_dir(&self) -> String {
42        self.dir.clone()
43    }
44}
45
46#[derive(Debug, PartialEq, Deserialize, Serialize, Default)]
47struct AsyncLoggerWriterParams {
48    channel_buffer: Option<usize>,
49    #[serde(default)]
50    #[serde(with = "humantime_serde")]
51    flush_timeout: Option<Duration>,
52}
53
54pub async fn new_async_logger(
55    path: &str,
56) -> Result<(Sender<BytesMut>, AsyncLoggerTask)> {
57    let original_path = path.to_string();
58    let (path, query) = path.split_once('?').unwrap_or((path, ""));
59    let params: AsyncLoggerWriterParams =
60        serde_qs::from_str(query).unwrap_or_default();
61
62    let rolling_file_writer =
63        new_rolling_file_writer(&original_path).map_err(|e| {
64            Error::Invalid {
65                message: format!("{}: {}", original_path, e),
66            }
67        })?;
68
69    let buffered_writer = BufWriter::new(rolling_file_writer.writer);
70    let channel_buffer = params.channel_buffer.unwrap_or(1000);
71    let flush_timeout = params.flush_timeout.unwrap_or(Duration::from_secs(10));
72
73    let (tx, rx) = channel::<BytesMut>(channel_buffer);
74
75    let task = AsyncLoggerTask {
76        dir: rolling_file_writer.dir,
77        channel_buffer,
78        path: path.to_string(),
79        receiver: Mutex::new(Some(rx)),
80        writer: Mutex::new(Some(buffered_writer)),
81        flush_timeout,
82    };
83
84    Ok((tx, task))
85}
86
87#[async_trait]
88impl BackgroundService for AsyncLoggerTask {
89    async fn start(&self, mut shutdown: ShutdownWatch) {
90        let Some(mut receiver) = self.receiver.lock().await.take() else {
91            return;
92        };
93        let Some(mut writer) = self.writer.lock().await.take() else {
94            return;
95        };
96        info!(
97            target: LOG_TARGET,
98            path = self.path,
99            channel_buffer = self.channel_buffer,
100            flush_timeout = format!("{:?}", self.flush_timeout),
101            "async logger is running",
102        );
103        const MAX_BATCH_SIZE: usize = 128;
104        let mut interval = tokio::time::interval(self.flush_timeout);
105
106        loop {
107            tokio::select! {
108                _ = shutdown.changed() => {
109                    break;
110                }
111                Some(msg) = receiver.recv() => {
112                    let mut messages = Vec::with_capacity(MAX_BATCH_SIZE);
113                    messages.push(msg);
114                    while messages.len() < MAX_BATCH_SIZE {
115                        match receiver.try_recv() {
116                            Ok(msg) => {
117                                messages.push(msg);
118                            }
119                            Err(_) => break,
120                        }
121                    }
122                    for mut msg in messages {
123                        msg.extend_from_slice(b"\n");
124                        if let Err(e) = writer.write(&msg) {
125                            error!(
126                                target: LOG_TARGET,
127                                error = %e,
128                                "write fail",
129                            );
130                        }
131                    }
132                }
133                _ = interval.tick() => {
134                    if let Err(e) = writer.flush() {
135                        error!(
136                            target: LOG_TARGET,
137                            error = %e,
138                            "flush fail",
139                        );
140                    }
141                }
142                else => {
143                    // `recv()` return None, all senders are gone
144                    break;
145                }
146            }
147        }
148        // clear channel
149        while let Some(mut msg) = receiver.recv().await {
150            msg.extend_from_slice(b"\n");
151            if let Err(e) = writer.write_all(&msg) {
152                error!(
153                    target: LOG_TARGET,
154                    error = %e,
155                    "write fail",
156                );
157            }
158        }
159
160        // flush writer
161        if let Err(e) = writer.flush() {
162            error!(
163                target: LOG_TARGET,
164                error = %e,
165                "flush fail",
166            );
167        }
168    }
169}