1use super::file_appender::new_rolling_file_writer;
16use super::new_env_filter;
17#[cfg(unix)]
18use super::syslog::new_syslog_writer;
19use super::{Error, LOG_TARGET};
20use async_trait::async_trait;
21use bytesize::ByteSize;
22use chrono::Timelike;
23use flate2::Compression;
24use flate2::write::GzEncoder;
25use pingap_core::BackgroundTask;
26use pingap_core::Error as ServiceError;
27use std::collections::HashSet;
28use std::fs;
29use std::io;
30#[cfg(unix)]
31use std::os::unix::fs::MetadataExt;
32#[cfg(windows)]
33use std::os::windows::fs::MetadataExt;
34use std::path::Path;
35use std::sync::Mutex;
36use std::time::Instant;
37use std::time::{Duration, SystemTime};
38use tracing::Subscriber;
39use tracing::{error, info};
40use tracing_subscriber::fmt::writer::BoxMakeWriter;
41use tracing_subscriber::layer::SubscriberExt;
42use tracing_subscriber::reload::Handle;
43use tracing_subscriber::reload::Layer;
44use tracing_subscriber::{EnvFilter, Registry};
45use walkdir::WalkDir;
46
47const DEFAULT_COMPRESSION_LEVEL: u8 = 9;
48const DEFAULT_DAYS_AGO: u16 = 7;
49const MIN_BUFFER_CAPACITY: u64 = 4096;
52
53static GZIP_EXT: &str = "gz";
54static ZSTD_EXT: &str = "zst";
55
56type Result<T, E = Error> = std::result::Result<T, E>;
57
58pub type LoggerReloadHandle = Handle<EnvFilter, Registry>;
59
60fn zstd_compress(file: &Path, level: u8) -> Result<(u64, u64)> {
69 let level = if level == 0 {
70 DEFAULT_COMPRESSION_LEVEL
71 } else {
72 level
73 }
74 .min(22);
75 let zst_file = file.with_extension(ZSTD_EXT);
76 let mut original_file =
77 fs::File::open(file).map_err(|e| Error::Io { source: e })?;
78 let file = fs::OpenOptions::new()
79 .read(true)
80 .write(true)
81 .create_new(true)
82 .open(&zst_file)
83 .map_err(|e| Error::Io { source: e })?;
84
85 let mut encoder = zstd::stream::Encoder::new(&file, level as i32)
86 .map_err(|e| Error::Io { source: e })?;
87 let original_size = io::copy(&mut original_file, &mut encoder)
88 .map_err(|e| Error::Io { source: e })?;
89 encoder.finish().map_err(|e| Error::Io { source: e })?;
90 #[cfg(unix)]
91 let size = file.metadata().map(|item| item.size()).unwrap_or_default();
92 #[cfg(windows)]
93 let size = file
94 .metadata()
95 .map(|item| item.file_size())
96 .unwrap_or_default();
97 Ok((size, original_size))
98}
99
100fn gzip_compress(file: &Path, level: u8) -> Result<(u64, u64)> {
109 let gzip_file = file.with_extension(GZIP_EXT);
110 let mut original_file =
111 fs::File::open(file).map_err(|e| Error::Io { source: e })?;
112 let file = fs::OpenOptions::new()
113 .read(true)
114 .write(true)
115 .create_new(true)
116 .open(&gzip_file)
117 .map_err(|e| Error::Io { source: e })?;
118 let level = if level == 0 {
119 Compression::best()
120 } else {
121 Compression::new(level.min(9) as u32)
122 };
123 let mut encoder = GzEncoder::new(&file, level);
124 let original_size = io::copy(&mut original_file, &mut encoder)
125 .map_err(|e| Error::Io { source: e })?;
126 encoder.finish().map_err(|e| Error::Io { source: e })?;
127 #[cfg(unix)]
128 let size = file.metadata().map(|item| item.size()).unwrap_or_default();
129 #[cfg(windows)]
130 let size = file
131 .metadata()
132 .map(|item| item.file_size())
133 .unwrap_or_default();
134 Ok((size, original_size))
135}
136
137#[derive(Debug, Clone, Default)]
139pub struct LogCompressParams {
140 dirs: Vec<String>,
141 compression: String,
142 level: u8,
143 days_ago: u16,
144 time_point_hour: u8,
145}
146
147impl LogCompressParams {
148 pub fn new(dirs: Vec<String>) -> Self {
149 Self {
150 dirs,
151 ..Default::default()
152 }
153 }
154 pub fn set_compression(&mut self, compression: String) {
155 self.compression = compression;
156 }
157 pub fn set_level(&mut self, level: u8) {
158 self.level = level;
159 }
160 pub fn set_days_ago(&mut self, days_ago: u16) {
161 self.days_ago = days_ago;
162 }
163 pub fn set_time_point_hour(&mut self, time_point_hour: u8) {
164 self.time_point_hour = time_point_hour;
165 }
166}
167
168async fn do_compress(
177 count: u32,
178 params: &LogCompressParams,
179) -> Result<bool, ServiceError> {
180 const OFFSET: u32 = 60;
181 if !count.is_multiple_of(OFFSET)
182 || params.time_point_hour != chrono::Local::now().hour() as u8
183 {
184 return Ok(false);
185 }
186
187 let days_ago = if params.days_ago == 0 {
188 DEFAULT_DAYS_AGO
189 } else {
190 params.days_ago
191 };
192 let access_before = SystemTime::now()
193 .checked_sub(Duration::from_secs(24 * 3600 * days_ago as u64))
194 .ok_or_else(|| ServiceError::Invalid {
195 message: "Failed to calculate access time".to_string(),
196 })?;
197 let compression_exts = [GZIP_EXT.to_string(), ZSTD_EXT.to_string()];
198 let unique_paths: HashSet<String> = params.dirs.iter().cloned().collect();
199
200 for path in unique_paths {
201 for entry in WalkDir::new(path).into_iter().filter_map(|e| e.ok()) {
202 let ext = entry
203 .path()
204 .extension()
205 .unwrap_or_default()
206 .to_string_lossy()
207 .to_string();
208 if compression_exts.contains(&ext) {
209 continue;
210 }
211 let Ok(metadata) = entry.metadata() else {
212 continue;
213 };
214 let Ok(accessed) = metadata.accessed() else {
215 continue;
216 };
217 if accessed > access_before {
218 continue;
219 }
220 let start = Instant::now();
221 let result = if params.compression == "gzip" {
222 gzip_compress(entry.path(), params.level)
223 } else {
224 zstd_compress(entry.path(), params.level)
225 };
226 let file = entry.path().to_string_lossy().to_string();
227 match result {
228 Err(e) => {
229 error!(
230 target: LOG_TARGET,
231 error = %e,
232 file,
233 "compress log fail"
234 );
235 },
236 Ok((size, original_size)) => {
237 let elapsed = format!("{}ms", start.elapsed().as_millis());
238 info!(
239 target: LOG_TARGET,
240 file,
241 elapsed,
242 original_size = ByteSize::b(original_size).to_string(),
243 size = ByteSize::b(size).to_string(),
244 "compress log success",
245 );
246 let _ = fs::remove_file(entry.path());
248 },
249 }
250 }
251 }
252 Ok(true)
253}
254
255struct LogCompressTask {
256 params: LogCompressParams,
257}
258
259#[async_trait]
260impl BackgroundTask for LogCompressTask {
261 async fn execute(&self, count: u32) -> Result<bool, ServiceError> {
262 do_compress(count, &self.params).await?;
263 Ok(true)
264 }
265}
266
267pub fn new_log_compress_service(
275 params: LogCompressParams,
276) -> Box<dyn BackgroundTask> {
277 Box::new(LogCompressTask { params })
278}
279
280#[derive(Default, Debug)]
282pub struct LoggerParams {
283 pub log: String,
284 pub level: String,
285 pub capacity: u64,
286 pub json: bool,
287}
288
289fn new_file_writer(params: &LoggerParams) -> Result<(BoxMakeWriter, String)> {
290 let rolling_file_writer = new_rolling_file_writer(¶ms.log)?;
291 let file = params
292 .log
293 .split_once('?')
294 .unwrap_or((params.log.as_str(), ""))
295 .0;
296
297 let filepath = Path::new(&file);
298 let dir = if filepath.is_dir() {
299 filepath
300 } else {
301 filepath.parent().ok_or_else(|| Error::Invalid {
302 message: "parent of file log is invalid".to_string(),
303 })?
304 };
305
306 let writer = if params.capacity < MIN_BUFFER_CAPACITY {
307 BoxMakeWriter::new(rolling_file_writer.writer)
308 } else {
309 let w = io::BufWriter::with_capacity(
311 params.capacity as usize,
312 rolling_file_writer.writer,
313 );
314 BoxMakeWriter::new(Mutex::new(w))
315 };
316 Ok((writer, dir.to_string_lossy().to_string()))
317}
318
319pub fn logger_try_init(
327 params: LoggerParams,
328) -> Result<(LoggerReloadHandle, Option<String>)> {
329 let level = if params.level.is_empty() {
330 std::env::var("RUST_LOG").unwrap_or("INFO".to_string())
331 } else {
332 params.level.clone()
333 };
334
335 let seconds = chrono::Local::now().offset().local_minus_utc();
336 let hours = (seconds / 3600) as i8;
337 let minutes = ((seconds % 3600) / 60) as i8;
338 let is_dev = cfg!(debug_assertions);
339
340 let initial_filter = new_env_filter(&level);
341 let (filter_layer, reload_handle) = Layer::new(initial_filter);
342 let registry = tracing_subscriber::registry().with(filter_layer);
343
344 let mut log_path = None;
345 let mut log_type = "stdio";
346 let writer = if params.log.is_empty() {
347 BoxMakeWriter::new(std::io::stderr)
348 } else if params.log.starts_with("syslog://") {
349 #[cfg(unix)]
350 {
351 new_syslog_writer(¶ms.log)?
352 }
353 #[cfg(not(unix))]
354 {
355 return Err(Error::Invalid {
356 message: "syslog is only supported on Unix systems".to_string(),
357 });
358 }
359 } else {
360 log_type = "file";
361 let (w, dir) = new_file_writer(¶ms)?;
362 log_path = Some(dir);
363 w
364 };
365 let timer = tracing_subscriber::fmt::time::OffsetTime::new(
366 time::UtcOffset::from_hms(hours, minutes, 0)
367 .unwrap_or(time::UtcOffset::UTC),
368 time::format_description::well_known::Rfc3339,
369 );
370
371 if params.json {
372 let fmt_layer = tracing_subscriber::fmt::layer()
373 .with_ansi(false)
374 .with_timer(timer)
375 .with_target(is_dev)
376 .with_writer(writer)
377 .json();
378 let subscriber = registry.with(fmt_layer);
379 let boxed_subscriber: Box<dyn Subscriber + Send + Sync> =
380 Box::new(subscriber);
381
382 tracing::subscriber::set_global_default(boxed_subscriber).map_err(
384 |e| Error::Invalid {
385 message: e.to_string(),
386 },
387 )?
388 } else {
389 let fmt_layer = tracing_subscriber::fmt::layer()
390 .with_ansi(is_dev) .with_timer(timer)
392 .with_target(is_dev)
393 .with_writer(writer);
394
395 let subscriber = registry.with(fmt_layer);
396 let boxed_subscriber: Box<dyn Subscriber + Send + Sync> =
397 Box::new(subscriber);
398
399 tracing::subscriber::set_global_default(boxed_subscriber).map_err(
400 |e| Error::Invalid {
401 message: e.to_string(),
402 },
403 )?
404 }
405
406 info!(
407 target: LOG_TARGET,
408 capacity = params.capacity,
409 log_type,
410 level = level.to_string(),
411 json_format = params.json,
412 utc_offset = chrono::Local::now().offset().to_string(),
413 "init tracing subscriber success",
414 );
415
416 Ok((reload_handle, log_path))
417}