1#[cfg(unix)]
16use super::syslog::new_syslog_writer;
17use super::{Error, LOG_CATEGORY};
18use bytesize::ByteSize;
19use chrono::Timelike;
20use flate2::write::GzEncoder;
21use flate2::Compression;
22use pingap_core::convert_query_map;
23use pingap_core::Error as ServiceError;
24use pingap_core::SimpleServiceTaskFuture;
25use std::fs;
26use std::io;
27#[cfg(unix)]
28use std::os::unix::fs::MetadataExt;
29#[cfg(windows)]
30use std::os::windows::fs::MetadataExt;
31use std::path::{Path, PathBuf};
32use std::sync::Mutex;
33use std::time::{Duration, SystemTime};
34use tracing::{error, info, Level};
35use tracing_subscriber::fmt::writer::BoxMakeWriter;
36use walkdir::WalkDir;
37
38const DEFAULT_COMPRESSION_LEVEL: u8 = 9;
39const DEFAULT_DAYS_AGO: u16 = 7;
40const MIN_BUFFER_CAPACITY: u64 = 4096;
43
44static GZIP_EXT: &str = "gz";
45static ZSTD_EXT: &str = "zst";
46
47type Result<T, E = Error> = std::result::Result<T, E>;
48
49fn zstd_compress(file: &Path, level: u8) -> Result<(u64, u64)> {
58 let level = if level == 0 {
59 DEFAULT_COMPRESSION_LEVEL
60 } else {
61 level
62 };
63 let zst_file = file.with_extension(ZSTD_EXT);
64 let mut original_file =
65 fs::File::open(file).map_err(|e| Error::Io { source: e })?;
66 let file = fs::OpenOptions::new()
67 .read(true)
68 .write(true)
69 .create_new(true)
70 .open(&zst_file)
71 .map_err(|e| Error::Io { source: e })?;
72
73 let mut encoder = zstd::stream::Encoder::new(&file, level as i32)
74 .map_err(|e| Error::Io { source: e })?;
75 let original_size = io::copy(&mut original_file, &mut encoder)
76 .map_err(|e| Error::Io { source: e })?;
77 encoder.finish().map_err(|e| Error::Io { source: e })?;
78 #[cfg(unix)]
79 let size = file.metadata().map(|item| item.size()).unwrap_or_default();
80 #[cfg(windows)]
81 let size = file
82 .metadata()
83 .map(|item| item.file_size())
84 .unwrap_or_default();
85 Ok((size, original_size))
86}
87
88fn gzip_compress(file: &Path, level: u8) -> Result<(u64, u64)> {
97 let gzip_file = file.with_extension(GZIP_EXT);
98 let mut original_file =
99 fs::File::open(file).map_err(|e| Error::Io { source: e })?;
100 let file = fs::OpenOptions::new()
101 .read(true)
102 .write(true)
103 .create_new(true)
104 .open(&gzip_file)
105 .map_err(|e| Error::Io { source: e })?;
106 let level = if level == 0 {
107 Compression::best()
108 } else {
109 Compression::new(level as u32)
110 };
111 let mut encoder = GzEncoder::new(&file, level);
112 let original_size = io::copy(&mut original_file, &mut encoder)
113 .map_err(|e| Error::Io { source: e })?;
114 encoder.finish().map_err(|e| Error::Io { source: e })?;
115 #[cfg(unix)]
116 let size = file.metadata().map(|item| item.size()).unwrap_or_default();
117 #[cfg(windows)]
118 let size = file
119 .metadata()
120 .map(|item| item.file_size())
121 .unwrap_or_default();
122 Ok((size, original_size))
123}
124
125#[derive(Debug, Clone)]
127struct LogCompressParams {
128 compression: String,
129 path: PathBuf,
130 level: u8,
131 days_ago: u16,
132 time_point_hour: u8,
133}
134
135impl Default for LogCompressParams {
136 fn default() -> Self {
137 Self {
138 compression: String::new(),
139 path: PathBuf::new(),
140 level: DEFAULT_COMPRESSION_LEVEL,
141 days_ago: DEFAULT_DAYS_AGO,
142 time_point_hour: 0,
143 }
144 }
145}
146
147async fn do_compress(
156 count: u32,
157 params: LogCompressParams,
158) -> Result<bool, ServiceError> {
159 const OFFSET: u32 = 60;
160 if count % OFFSET != 0
161 || params.time_point_hour != chrono::Local::now().hour() as u8
162 {
163 return Ok(false);
164 }
165
166 let days_ago = if params.days_ago == 0 {
167 DEFAULT_DAYS_AGO
168 } else {
169 params.days_ago
170 };
171 let access_before = SystemTime::now()
172 .checked_sub(Duration::from_secs(24 * 3600 * days_ago as u64))
173 .ok_or_else(|| ServiceError::Invalid {
174 message: "Failed to calculate access time".to_string(),
175 })?;
176 let compression_exts = [GZIP_EXT.to_string(), ZSTD_EXT.to_string()];
177 for entry in WalkDir::new(¶ms.path)
178 .into_iter()
179 .filter_map(|e| e.ok())
180 {
181 let ext = entry
182 .path()
183 .extension()
184 .unwrap_or_default()
185 .to_string_lossy()
186 .to_string();
187 if compression_exts.contains(&ext) {
188 continue;
189 }
190 let Ok(metadata) = entry.metadata() else {
191 continue;
192 };
193 let Ok(accessed) = metadata.accessed() else {
194 continue;
195 };
196 if accessed > access_before {
197 continue;
198 }
199 let start = SystemTime::now();
200 let result = if params.compression == "gzip" {
201 gzip_compress(entry.path(), params.level)
202 } else {
203 zstd_compress(entry.path(), params.level)
204 };
205 let file = entry.path().to_string_lossy().to_string();
206 match result {
207 Err(e) => {
208 error!(
209 category = LOG_CATEGORY,
210 error = %e,
211 file,
212 "compress log fail"
213 );
214 },
215 Ok((size, original_size)) => {
216 let elapsed = format!("{}ms", pingap_util::elapsed_ms(start));
217 info!(
218 category = LOG_CATEGORY,
219 file,
220 elapsed,
221 original_size = ByteSize::b(original_size).to_string(),
222 size = ByteSize::b(size).to_string(),
223 "compress log success",
224 );
225 let _ = fs::remove_file(entry.path());
227 },
228 }
229 }
230 Ok(true)
231}
232
233fn new_log_compress_service(
241 params: LogCompressParams,
242) -> Option<(String, SimpleServiceTaskFuture)> {
243 let task: SimpleServiceTaskFuture = Box::new(move |count: u32| {
244 Box::pin({
245 let value = params.clone();
246 async move {
247 let value = value.clone();
248 do_compress(count, value).await
249 }
250 })
251 });
252 Some(("log_compress".to_string(), task))
253}
254
255#[derive(Default, Debug)]
257pub struct LoggerParams {
258 pub log: String,
259 pub level: String,
260 pub capacity: u64,
261 pub json: bool,
262}
263
264fn new_file_writer(
265 params: &LoggerParams,
266) -> Result<(BoxMakeWriter, Option<(String, SimpleServiceTaskFuture)>)> {
267 let mut file = pingap_util::resolve_path(¶ms.log);
268 let mut rolling_type = "".to_string();
269 let mut compression = "".to_string();
270 let mut level = 0;
271 let mut days_ago = 0;
272 let mut time_point_hour = 0;
273 let mut task = None;
274 if let Some((_, query)) = params.log.split_once('?') {
275 file = file.replace(&format!("?{query}"), "");
276 let m = convert_query_map(query);
277 if let Some(value) = m.get("rolling") {
278 rolling_type = value.to_string();
279 }
280 if let Some(value) = m.get("compression") {
281 compression = value.to_string();
282 }
283 if let Some(value) = m.get("level") {
284 level = value.parse::<u8>().unwrap_or_default();
285 }
286 if let Some(value) = m.get("days_ago") {
287 days_ago = value.parse::<u16>().unwrap_or_default();
288 }
289 if let Some(value) = m.get("time_point_hour") {
290 time_point_hour = value.parse::<u8>().unwrap_or_default();
291 }
292 }
293
294 let filepath = Path::new(&file);
295 let dir = if filepath.is_dir() {
296 filepath
297 } else {
298 filepath.parent().ok_or_else(|| Error::Invalid {
299 message: "parent of file log is invalid".to_string(),
300 })?
301 };
302 fs::create_dir_all(dir).map_err(|e| Error::Io { source: e })?;
303 if !compression.is_empty() {
304 task = new_log_compress_service(LogCompressParams {
305 compression,
306 path: dir.to_path_buf(),
307 days_ago,
308 level,
309 time_point_hour,
310 });
311 }
312
313 let filename = if filepath.is_dir() {
314 "".to_string()
315 } else {
316 filepath
317 .file_name()
318 .ok_or_else(|| Error::Invalid {
319 message: "file log is invalid".to_string(),
320 })?
321 .to_string_lossy()
322 .to_string()
323 };
324 let file_appender = match rolling_type.as_str() {
325 "minutely" => tracing_appender::rolling::minutely(dir, filename),
326 "hourly" => tracing_appender::rolling::hourly(dir, filename),
327 "never" => tracing_appender::rolling::never(dir, filename),
328 _ => tracing_appender::rolling::daily(dir, filename),
329 };
330
331 let writer = if params.capacity < MIN_BUFFER_CAPACITY {
332 BoxMakeWriter::new(file_appender)
333 } else {
334 let w = io::BufWriter::with_capacity(
336 params.capacity as usize,
337 file_appender,
338 );
339 BoxMakeWriter::new(Mutex::new(w))
340 };
341 Ok((writer, task))
342}
343
344pub fn logger_try_init(
352 params: LoggerParams,
353) -> Result<Option<(String, SimpleServiceTaskFuture)>> {
354 let level = if params.level.is_empty() {
355 std::env::var("RUST_LOG").unwrap_or("INFO".to_string())
356 } else {
357 params.level.clone()
358 };
359
360 let level = match level.to_lowercase().as_str() {
361 "trace" => Level::TRACE,
362 "debug" => Level::DEBUG,
363 "warn" => Level::WARN,
364 "error" => Level::ERROR,
365 _ => Level::INFO,
366 };
367
368 let seconds = chrono::Local::now().offset().local_minus_utc();
369 let hours = (seconds / 3600) as i8;
370 let minutes = ((seconds % 3600) / 60) as i8;
371 let is_dev = cfg!(debug_assertions);
372
373 let builder = tracing_subscriber::fmt()
374 .with_max_level(level)
375 .with_ansi(is_dev)
376 .with_timer(tracing_subscriber::fmt::time::OffsetTime::new(
377 time::UtcOffset::from_hms(hours, minutes, 0).unwrap(),
378 time::format_description::well_known::Rfc3339,
379 ))
380 .with_target(is_dev);
381 let mut task = None;
382 let mut log_type = "stdio";
383 let writer = if params.log.is_empty() {
384 BoxMakeWriter::new(std::io::stderr)
385 } else if params.log.starts_with("syslog://") {
386 #[cfg(unix)]
387 {
388 new_syslog_writer(¶ms.log)?
389 }
390 #[cfg(not(unix))]
391 {
392 return Err(Error::Invalid {
393 message: "syslog is only supported on Unix systems".to_string(),
394 });
395 }
396 } else {
397 log_type = "file";
398 let (w, t) = new_file_writer(¶ms)?;
399 task = t;
400 w
401 };
402 if params.json {
403 builder
404 .event_format(tracing_subscriber::fmt::format::json())
405 .with_writer(writer)
406 .init();
407 } else {
408 builder.with_writer(writer).init();
409 }
410
411 info!(
412 category = LOG_CATEGORY,
413 capacity = params.capacity,
414 log_type,
415 level = level.to_string(),
416 json_format = params.json,
417 utc_offset = chrono::Local::now().offset().to_string(),
418 "init tracing subscriber success",
419 );
420
421 Ok(task)
422}