pingap_logger/
writer.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(unix)]
16use super::syslog::new_syslog_writer;
17use super::{Error, LOG_CATEGORY};
18use bytesize::ByteSize;
19use chrono::Timelike;
20use flate2::write::GzEncoder;
21use flate2::Compression;
22use pingap_core::convert_query_map;
23use pingap_core::Error as ServiceError;
24use pingap_core::SimpleServiceTaskFuture;
25use std::fs;
26use std::io;
27#[cfg(unix)]
28use std::os::unix::fs::MetadataExt;
29#[cfg(windows)]
30use std::os::windows::fs::MetadataExt;
31use std::path::{Path, PathBuf};
32use std::sync::Mutex;
33use std::time::{Duration, SystemTime};
34use tracing::{error, info, Level};
35use tracing_subscriber::fmt::writer::BoxMakeWriter;
36use walkdir::WalkDir;
37
38const DEFAULT_COMPRESSION_LEVEL: u8 = 9;
39const DEFAULT_DAYS_AGO: u16 = 7;
40/// Minimum capacity in bytes for buffered log writing. When capacity is specified
41/// below this value, no buffering will be used.
42const MIN_BUFFER_CAPACITY: u64 = 4096;
43
44static GZIP_EXT: &str = "gz";
45static ZSTD_EXT: &str = "zst";
46
47type Result<T, E = Error> = std::result::Result<T, E>;
48
49/// Compresses a file using zstd compression
50///
51/// # Arguments
52/// * `file` - Path to the file to compress
53/// * `level` - Compression level (0 uses default level)
54///
55/// # Returns
56/// A tuple of (compressed_size, original_size) in bytes
57fn zstd_compress(file: &Path, level: u8) -> Result<(u64, u64)> {
58    let level = if level == 0 {
59        DEFAULT_COMPRESSION_LEVEL
60    } else {
61        level
62    };
63    let zst_file = file.with_extension(ZSTD_EXT);
64    let mut original_file =
65        fs::File::open(file).map_err(|e| Error::Io { source: e })?;
66    let file = fs::OpenOptions::new()
67        .read(true)
68        .write(true)
69        .create_new(true)
70        .open(&zst_file)
71        .map_err(|e| Error::Io { source: e })?;
72
73    let mut encoder = zstd::stream::Encoder::new(&file, level as i32)
74        .map_err(|e| Error::Io { source: e })?;
75    let original_size = io::copy(&mut original_file, &mut encoder)
76        .map_err(|e| Error::Io { source: e })?;
77    encoder.finish().map_err(|e| Error::Io { source: e })?;
78    #[cfg(unix)]
79    let size = file.metadata().map(|item| item.size()).unwrap_or_default();
80    #[cfg(windows)]
81    let size = file
82        .metadata()
83        .map(|item| item.file_size())
84        .unwrap_or_default();
85    Ok((size, original_size))
86}
87
88/// Compresses a file using gzip compression
89///
90/// # Arguments
91/// * `file` - Path to the file to compress
92/// * `level` - Compression level (0 uses best compression)
93///
94/// # Returns
95/// A tuple of (compressed_size, original_size) in bytes
96fn gzip_compress(file: &Path, level: u8) -> Result<(u64, u64)> {
97    let gzip_file = file.with_extension(GZIP_EXT);
98    let mut original_file =
99        fs::File::open(file).map_err(|e| Error::Io { source: e })?;
100    let file = fs::OpenOptions::new()
101        .read(true)
102        .write(true)
103        .create_new(true)
104        .open(&gzip_file)
105        .map_err(|e| Error::Io { source: e })?;
106    let level = if level == 0 {
107        Compression::best()
108    } else {
109        Compression::new(level as u32)
110    };
111    let mut encoder = GzEncoder::new(&file, level);
112    let original_size = io::copy(&mut original_file, &mut encoder)
113        .map_err(|e| Error::Io { source: e })?;
114    encoder.finish().map_err(|e| Error::Io { source: e })?;
115    #[cfg(unix)]
116    let size = file.metadata().map(|item| item.size()).unwrap_or_default();
117    #[cfg(windows)]
118    let size = file
119        .metadata()
120        .map(|item| item.file_size())
121        .unwrap_or_default();
122    Ok((size, original_size))
123}
124
125/// Parameters for log compression configuration
126#[derive(Debug, Clone)]
127struct LogCompressParams {
128    compression: String,
129    path: PathBuf,
130    level: u8,
131    days_ago: u16,
132    time_point_hour: u8,
133}
134
135impl Default for LogCompressParams {
136    fn default() -> Self {
137        Self {
138            compression: String::new(),
139            path: PathBuf::new(),
140            level: DEFAULT_COMPRESSION_LEVEL,
141            days_ago: DEFAULT_DAYS_AGO,
142            time_point_hour: 0,
143        }
144    }
145}
146
147/// Performs log file compression based on specified parameters
148///
149/// # Arguments
150/// * `count` - Counter used for timing compression runs
151/// * `params` - Configuration parameters for compression
152///
153/// # Returns
154/// Boolean indicating if compression was performed
155async fn do_compress(
156    count: u32,
157    params: LogCompressParams,
158) -> Result<bool, ServiceError> {
159    const OFFSET: u32 = 60;
160    if count % OFFSET != 0
161        || params.time_point_hour != chrono::Local::now().hour() as u8
162    {
163        return Ok(false);
164    }
165
166    let days_ago = if params.days_ago == 0 {
167        DEFAULT_DAYS_AGO
168    } else {
169        params.days_ago
170    };
171    let access_before = SystemTime::now()
172        .checked_sub(Duration::from_secs(24 * 3600 * days_ago as u64))
173        .ok_or_else(|| ServiceError::Invalid {
174            message: "Failed to calculate access time".to_string(),
175        })?;
176    let compression_exts = [GZIP_EXT.to_string(), ZSTD_EXT.to_string()];
177    for entry in WalkDir::new(&params.path)
178        .into_iter()
179        .filter_map(|e| e.ok())
180    {
181        let ext = entry
182            .path()
183            .extension()
184            .unwrap_or_default()
185            .to_string_lossy()
186            .to_string();
187        if compression_exts.contains(&ext) {
188            continue;
189        }
190        let Ok(metadata) = entry.metadata() else {
191            continue;
192        };
193        let Ok(accessed) = metadata.accessed() else {
194            continue;
195        };
196        if accessed > access_before {
197            continue;
198        }
199        let start = SystemTime::now();
200        let result = if params.compression == "gzip" {
201            gzip_compress(entry.path(), params.level)
202        } else {
203            zstd_compress(entry.path(), params.level)
204        };
205        let file = entry.path().to_string_lossy().to_string();
206        match result {
207            Err(e) => {
208                error!(
209                    category = LOG_CATEGORY,
210                    error = %e,
211                    file,
212                    "compress log fail"
213                );
214            },
215            Ok((size, original_size)) => {
216                let elapsed = format!("{}ms", pingap_util::elapsed_ms(start));
217                info!(
218                    category = LOG_CATEGORY,
219                    file,
220                    elapsed,
221                    original_size = ByteSize::b(original_size).to_string(),
222                    size = ByteSize::b(size).to_string(),
223                    "compress log success",
224                );
225                // ignore remove
226                let _ = fs::remove_file(entry.path());
227            },
228        }
229    }
230    Ok(true)
231}
232
233/// Creates a new log compression service task
234///
235/// # Arguments
236/// * `params` - Configuration parameters for the compression service
237///
238/// # Returns
239/// Optional tuple containing service name and task future
240fn new_log_compress_service(
241    params: LogCompressParams,
242) -> Option<(String, SimpleServiceTaskFuture)> {
243    let task: SimpleServiceTaskFuture = Box::new(move |count: u32| {
244        Box::pin({
245            let value = params.clone();
246            async move {
247                let value = value.clone();
248                do_compress(count, value).await
249            }
250        })
251    });
252    Some(("log_compress".to_string(), task))
253}
254
255/// Parameters for logger configuration
256#[derive(Default, Debug)]
257pub struct LoggerParams {
258    pub log: String,
259    pub level: String,
260    pub capacity: u64,
261    pub json: bool,
262}
263
264fn new_file_writer(
265    params: &LoggerParams,
266) -> Result<(BoxMakeWriter, Option<(String, SimpleServiceTaskFuture)>)> {
267    let mut file = pingap_util::resolve_path(&params.log);
268    let mut rolling_type = "".to_string();
269    let mut compression = "".to_string();
270    let mut level = 0;
271    let mut days_ago = 0;
272    let mut time_point_hour = 0;
273    let mut task = None;
274    if let Some((_, query)) = params.log.split_once('?') {
275        file = file.replace(&format!("?{query}"), "");
276        let m = convert_query_map(query);
277        if let Some(value) = m.get("rolling") {
278            rolling_type = value.to_string();
279        }
280        if let Some(value) = m.get("compression") {
281            compression = value.to_string();
282        }
283        if let Some(value) = m.get("level") {
284            level = value.parse::<u8>().unwrap_or_default();
285        }
286        if let Some(value) = m.get("days_ago") {
287            days_ago = value.parse::<u16>().unwrap_or_default();
288        }
289        if let Some(value) = m.get("time_point_hour") {
290            time_point_hour = value.parse::<u8>().unwrap_or_default();
291        }
292    }
293
294    let filepath = Path::new(&file);
295    let dir = if filepath.is_dir() {
296        filepath
297    } else {
298        filepath.parent().ok_or_else(|| Error::Invalid {
299            message: "parent of file log is invalid".to_string(),
300        })?
301    };
302    fs::create_dir_all(dir).map_err(|e| Error::Io { source: e })?;
303    if !compression.is_empty() {
304        task = new_log_compress_service(LogCompressParams {
305            compression,
306            path: dir.to_path_buf(),
307            days_ago,
308            level,
309            time_point_hour,
310        });
311    }
312
313    let filename = if filepath.is_dir() {
314        "".to_string()
315    } else {
316        filepath
317            .file_name()
318            .ok_or_else(|| Error::Invalid {
319                message: "file log is invalid".to_string(),
320            })?
321            .to_string_lossy()
322            .to_string()
323    };
324    let file_appender = match rolling_type.as_str() {
325        "minutely" => tracing_appender::rolling::minutely(dir, filename),
326        "hourly" => tracing_appender::rolling::hourly(dir, filename),
327        "never" => tracing_appender::rolling::never(dir, filename),
328        _ => tracing_appender::rolling::daily(dir, filename),
329    };
330
331    let writer = if params.capacity < MIN_BUFFER_CAPACITY {
332        BoxMakeWriter::new(file_appender)
333    } else {
334        // buffer writer for better performance
335        let w = io::BufWriter::with_capacity(
336            params.capacity as usize,
337            file_appender,
338        );
339        BoxMakeWriter::new(Mutex::new(w))
340    };
341    Ok((writer, task))
342}
343
344/// Initializes the logging system with the specified configuration
345///
346/// # Arguments
347/// * `params` - Logger configuration parameters
348///
349/// # Returns
350/// Optional tuple containing compression service name and task future if compression is enabled
351pub fn logger_try_init(
352    params: LoggerParams,
353) -> Result<Option<(String, SimpleServiceTaskFuture)>> {
354    let level = if params.level.is_empty() {
355        std::env::var("RUST_LOG").unwrap_or("INFO".to_string())
356    } else {
357        params.level.clone()
358    };
359
360    let level = match level.to_lowercase().as_str() {
361        "trace" => Level::TRACE,
362        "debug" => Level::DEBUG,
363        "warn" => Level::WARN,
364        "error" => Level::ERROR,
365        _ => Level::INFO,
366    };
367
368    let seconds = chrono::Local::now().offset().local_minus_utc();
369    let hours = (seconds / 3600) as i8;
370    let minutes = ((seconds % 3600) / 60) as i8;
371    let is_dev = cfg!(debug_assertions);
372
373    let builder = tracing_subscriber::fmt()
374        .with_max_level(level)
375        .with_ansi(is_dev)
376        .with_timer(tracing_subscriber::fmt::time::OffsetTime::new(
377            time::UtcOffset::from_hms(hours, minutes, 0).unwrap(),
378            time::format_description::well_known::Rfc3339,
379        ))
380        .with_target(is_dev);
381    let mut task = None;
382    let mut log_type = "stdio";
383    let writer = if params.log.is_empty() {
384        BoxMakeWriter::new(std::io::stderr)
385    } else if params.log.starts_with("syslog://") {
386        #[cfg(unix)]
387        {
388            new_syslog_writer(&params.log)?
389        }
390        #[cfg(not(unix))]
391        {
392            return Err(Error::Invalid {
393                message: "syslog is only supported on Unix systems".to_string(),
394            });
395        }
396    } else {
397        log_type = "file";
398        let (w, t) = new_file_writer(&params)?;
399        task = t;
400        w
401    };
402    if params.json {
403        builder
404            .event_format(tracing_subscriber::fmt::format::json())
405            .with_writer(writer)
406            .init();
407    } else {
408        builder.with_writer(writer).init();
409    }
410
411    info!(
412        category = LOG_CATEGORY,
413        capacity = params.capacity,
414        log_type,
415        level = level.to_string(),
416        json_format = params.json,
417        utc_offset = chrono::Local::now().offset().to_string(),
418        "init tracing subscriber success",
419    );
420
421    Ok(task)
422}