Skip to main content

pingap_logger/
writer.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::file_appender::new_rolling_file_writer;
16use super::new_env_filter;
17#[cfg(unix)]
18use super::syslog::new_syslog_writer;
19use super::{Error, LOG_TARGET};
20use async_trait::async_trait;
21use bytesize::ByteSize;
22use chrono::Timelike;
23use flate2::Compression;
24use flate2::write::GzEncoder;
25use pingap_core::BackgroundTask;
26use pingap_core::Error as ServiceError;
27use std::collections::HashSet;
28use std::fs;
29use std::io;
30#[cfg(unix)]
31use std::os::unix::fs::MetadataExt;
32#[cfg(windows)]
33use std::os::windows::fs::MetadataExt;
34use std::path::Path;
35use std::sync::Mutex;
36use std::time::Instant;
37use std::time::{Duration, SystemTime};
38use tracing::Subscriber;
39use tracing::{error, info};
40use tracing_subscriber::fmt::writer::BoxMakeWriter;
41use tracing_subscriber::layer::SubscriberExt;
42use tracing_subscriber::reload::Handle;
43use tracing_subscriber::reload::Layer;
44use tracing_subscriber::{EnvFilter, Registry};
45use walkdir::WalkDir;
46
47const DEFAULT_COMPRESSION_LEVEL: u8 = 9;
48const DEFAULT_DAYS_AGO: u16 = 7;
49/// Minimum capacity in bytes for buffered log writing. When capacity is specified
50/// below this value, no buffering will be used.
51const MIN_BUFFER_CAPACITY: u64 = 4096;
52
53static GZIP_EXT: &str = "gz";
54static ZSTD_EXT: &str = "zst";
55
56type Result<T, E = Error> = std::result::Result<T, E>;
57
58pub type LoggerReloadHandle = Handle<EnvFilter, Registry>;
59
60/// Compresses a file using zstd compression
61///
62/// # Arguments
63/// * `file` - Path to the file to compress
64/// * `level` - Compression level (0 uses default level)
65///
66/// # Returns
67/// A tuple of (compressed_size, original_size) in bytes
68fn zstd_compress(file: &Path, level: u8) -> Result<(u64, u64)> {
69    let level = if level == 0 {
70        DEFAULT_COMPRESSION_LEVEL
71    } else {
72        level
73    }
74    .min(22);
75    let zst_file = file.with_extension(ZSTD_EXT);
76    let mut original_file =
77        fs::File::open(file).map_err(|e| Error::Io { source: e })?;
78    let file = fs::OpenOptions::new()
79        .read(true)
80        .write(true)
81        .create_new(true)
82        .open(&zst_file)
83        .map_err(|e| Error::Io { source: e })?;
84
85    let mut encoder = zstd::stream::Encoder::new(&file, level as i32)
86        .map_err(|e| Error::Io { source: e })?;
87    let original_size = io::copy(&mut original_file, &mut encoder)
88        .map_err(|e| Error::Io { source: e })?;
89    encoder.finish().map_err(|e| Error::Io { source: e })?;
90    #[cfg(unix)]
91    let size = file.metadata().map(|item| item.size()).unwrap_or_default();
92    #[cfg(windows)]
93    let size = file
94        .metadata()
95        .map(|item| item.file_size())
96        .unwrap_or_default();
97    Ok((size, original_size))
98}
99
100/// Compresses a file using gzip compression
101///
102/// # Arguments
103/// * `file` - Path to the file to compress
104/// * `level` - Compression level (0 uses best compression)
105///
106/// # Returns
107/// A tuple of (compressed_size, original_size) in bytes
108fn gzip_compress(file: &Path, level: u8) -> Result<(u64, u64)> {
109    let gzip_file = file.with_extension(GZIP_EXT);
110    let mut original_file =
111        fs::File::open(file).map_err(|e| Error::Io { source: e })?;
112    let file = fs::OpenOptions::new()
113        .read(true)
114        .write(true)
115        .create_new(true)
116        .open(&gzip_file)
117        .map_err(|e| Error::Io { source: e })?;
118    let level = if level == 0 {
119        Compression::best()
120    } else {
121        Compression::new(level.min(9) as u32)
122    };
123    let mut encoder = GzEncoder::new(&file, level);
124    let original_size = io::copy(&mut original_file, &mut encoder)
125        .map_err(|e| Error::Io { source: e })?;
126    encoder.finish().map_err(|e| Error::Io { source: e })?;
127    #[cfg(unix)]
128    let size = file.metadata().map(|item| item.size()).unwrap_or_default();
129    #[cfg(windows)]
130    let size = file
131        .metadata()
132        .map(|item| item.file_size())
133        .unwrap_or_default();
134    Ok((size, original_size))
135}
136
137/// Parameters for log compression configuration
138#[derive(Debug, Clone, Default)]
139pub struct LogCompressParams {
140    dirs: Vec<String>,
141    compression: String,
142    level: u8,
143    days_ago: u16,
144    time_point_hour: u8,
145}
146
147impl LogCompressParams {
148    pub fn new(dirs: Vec<String>) -> Self {
149        Self {
150            dirs,
151            ..Default::default()
152        }
153    }
154    pub fn set_compression(&mut self, compression: String) {
155        self.compression = compression;
156    }
157    pub fn set_level(&mut self, level: u8) {
158        self.level = level;
159    }
160    pub fn set_days_ago(&mut self, days_ago: u16) {
161        self.days_ago = days_ago;
162    }
163    pub fn set_time_point_hour(&mut self, time_point_hour: u8) {
164        self.time_point_hour = time_point_hour;
165    }
166}
167
168/// Performs log file compression based on specified parameters
169///
170/// # Arguments
171/// * `count` - Counter used for timing compression runs
172/// * `params` - Configuration parameters for compression
173///
174/// # Returns
175/// Boolean indicating if compression was performed
176async fn do_compress(
177    count: u32,
178    params: &LogCompressParams,
179) -> Result<bool, ServiceError> {
180    const OFFSET: u32 = 60;
181    if !count.is_multiple_of(OFFSET)
182        || params.time_point_hour != chrono::Local::now().hour() as u8
183    {
184        return Ok(false);
185    }
186
187    let days_ago = if params.days_ago == 0 {
188        DEFAULT_DAYS_AGO
189    } else {
190        params.days_ago
191    };
192    let access_before = SystemTime::now()
193        .checked_sub(Duration::from_secs(24 * 3600 * days_ago as u64))
194        .ok_or_else(|| ServiceError::Invalid {
195            message: "Failed to calculate access time".to_string(),
196        })?;
197    let compression_exts = [GZIP_EXT.to_string(), ZSTD_EXT.to_string()];
198    let unique_paths: HashSet<String> = params.dirs.iter().cloned().collect();
199
200    for path in unique_paths {
201        for entry in WalkDir::new(path).into_iter().filter_map(|e| e.ok()) {
202            let ext = entry
203                .path()
204                .extension()
205                .unwrap_or_default()
206                .to_string_lossy()
207                .to_string();
208            if compression_exts.contains(&ext) {
209                continue;
210            }
211            let Ok(metadata) = entry.metadata() else {
212                continue;
213            };
214            let Ok(accessed) = metadata.accessed() else {
215                continue;
216            };
217            if accessed > access_before {
218                continue;
219            }
220            let start = Instant::now();
221            let result = if params.compression == "gzip" {
222                gzip_compress(entry.path(), params.level)
223            } else {
224                zstd_compress(entry.path(), params.level)
225            };
226            let file = entry.path().to_string_lossy().to_string();
227            match result {
228                Err(e) => {
229                    error!(
230                        target: LOG_TARGET,
231                        error = %e,
232                        file,
233                        "compress log fail"
234                    );
235                },
236                Ok((size, original_size)) => {
237                    let elapsed = format!("{}ms", start.elapsed().as_millis());
238                    info!(
239                        target: LOG_TARGET,
240                        file,
241                        elapsed,
242                        original_size = ByteSize::b(original_size).to_string(),
243                        size = ByteSize::b(size).to_string(),
244                        "compress log success",
245                    );
246                    // ignore remove
247                    let _ = fs::remove_file(entry.path());
248                },
249            }
250        }
251    }
252    Ok(true)
253}
254
255struct LogCompressTask {
256    params: LogCompressParams,
257}
258
259#[async_trait]
260impl BackgroundTask for LogCompressTask {
261    async fn execute(&self, count: u32) -> Result<bool, ServiceError> {
262        do_compress(count, &self.params).await?;
263        Ok(true)
264    }
265}
266
267/// Creates a new log compression service task
268///
269/// # Arguments
270/// * `params` - Configuration parameters for the compression service
271///
272/// # Returns
273/// Optional tuple containing service name and task future
274pub fn new_log_compress_service(
275    params: LogCompressParams,
276) -> Box<dyn BackgroundTask> {
277    Box::new(LogCompressTask { params })
278}
279
280/// Parameters for logger configuration
281#[derive(Default, Debug)]
282pub struct LoggerParams {
283    pub log: String,
284    pub level: String,
285    pub capacity: u64,
286    pub json: bool,
287}
288
289fn new_file_writer(params: &LoggerParams) -> Result<(BoxMakeWriter, String)> {
290    let rolling_file_writer = new_rolling_file_writer(&params.log)?;
291    let file = params
292        .log
293        .split_once('?')
294        .unwrap_or((params.log.as_str(), ""))
295        .0;
296
297    let filepath = Path::new(&file);
298    let dir = if filepath.is_dir() {
299        filepath
300    } else {
301        filepath.parent().ok_or_else(|| Error::Invalid {
302            message: "parent of file log is invalid".to_string(),
303        })?
304    };
305
306    let writer = if params.capacity < MIN_BUFFER_CAPACITY {
307        BoxMakeWriter::new(rolling_file_writer.writer)
308    } else {
309        // buffer writer for better performance
310        let w = io::BufWriter::with_capacity(
311            params.capacity as usize,
312            rolling_file_writer.writer,
313        );
314        BoxMakeWriter::new(Mutex::new(w))
315    };
316    Ok((writer, dir.to_string_lossy().to_string()))
317}
318
319/// Initializes the logging system with the specified configuration
320///
321/// # Arguments
322/// * `params` - Logger configuration parameters
323///
324/// # Returns
325/// Optional log path if file log is enabled
326pub fn logger_try_init(
327    params: LoggerParams,
328) -> Result<(LoggerReloadHandle, Option<String>)> {
329    let level = if params.level.is_empty() {
330        std::env::var("RUST_LOG").unwrap_or("INFO".to_string())
331    } else {
332        params.level.clone()
333    };
334
335    let seconds = chrono::Local::now().offset().local_minus_utc();
336    let hours = (seconds / 3600) as i8;
337    let minutes = ((seconds % 3600) / 60) as i8;
338    let is_dev = cfg!(debug_assertions);
339
340    let initial_filter = new_env_filter(&level);
341    let (filter_layer, reload_handle) = Layer::new(initial_filter);
342    let registry = tracing_subscriber::registry().with(filter_layer);
343
344    let mut log_path = None;
345    let mut log_type = "stdio";
346    let writer = if params.log.is_empty() {
347        BoxMakeWriter::new(std::io::stderr)
348    } else if params.log.starts_with("syslog://") {
349        #[cfg(unix)]
350        {
351            new_syslog_writer(&params.log)?
352        }
353        #[cfg(not(unix))]
354        {
355            return Err(Error::Invalid {
356                message: "syslog is only supported on Unix systems".to_string(),
357            });
358        }
359    } else {
360        log_type = "file";
361        let (w, dir) = new_file_writer(&params)?;
362        log_path = Some(dir);
363        w
364    };
365    let timer = tracing_subscriber::fmt::time::OffsetTime::new(
366        time::UtcOffset::from_hms(hours, minutes, 0)
367            .unwrap_or(time::UtcOffset::UTC),
368        time::format_description::well_known::Rfc3339,
369    );
370
371    if params.json {
372        let fmt_layer = tracing_subscriber::fmt::layer()
373            .with_ansi(false)
374            .with_timer(timer)
375            .with_target(is_dev)
376            .with_writer(writer)
377            .json();
378        let subscriber = registry.with(fmt_layer);
379        let boxed_subscriber: Box<dyn Subscriber + Send + Sync> =
380            Box::new(subscriber);
381
382        // set as global default
383        tracing::subscriber::set_global_default(boxed_subscriber).map_err(
384            |e| Error::Invalid {
385                message: e.to_string(),
386            },
387        )?
388    } else {
389        let fmt_layer = tracing_subscriber::fmt::layer()
390            .with_ansi(is_dev) // text format with color if dev
391            .with_timer(timer)
392            .with_target(is_dev)
393            .with_writer(writer);
394
395        let subscriber = registry.with(fmt_layer);
396        let boxed_subscriber: Box<dyn Subscriber + Send + Sync> =
397            Box::new(subscriber);
398
399        tracing::subscriber::set_global_default(boxed_subscriber).map_err(
400            |e| Error::Invalid {
401                message: e.to_string(),
402            },
403        )?
404    }
405
406    info!(
407        target: LOG_TARGET,
408        capacity = params.capacity,
409        log_type,
410        level = level.to_string(),
411        json_format = params.json,
412        utc_offset = chrono::Local::now().offset().to_string(),
413        "init tracing subscriber success",
414    );
415
416    Ok((reload_handle, log_path))
417}