rust_rsm/rsm/xlog/
mod.rs

1//! #xlog
2//! xlog is comprise of a xlog server and multiple xlog clients, which can be configured with different log filter level
3//! application's log message can be send to the server only when the log level higher than the log filter level
4//! each application module can create their own xlog client instance, then calling xlog client method to process log,
5//! xlog client using UDP socket to send json formatted message to xlog server, 
6//! then xlog server filter the log by configured filter level after receiving client's log message, the write to the disk periodically
7//! 
8
9use self::syslog::sys_log_client_t;
10use super::*;
11use serde::{Deserialize, Serialize};
12use std::net::{IpAddr,SocketAddr,UdpSocket};
13use crate::common::{self,tsmap::TsHashMap};
14use std::collections::{VecDeque};
15use std::io::{self,Read,Write};
16use std::fs;
17use libdeflater;
18
19pub mod syslog;
20pub mod xlogger;
21pub mod xlog_server;
22
23pub type LOG_LEVEL = i32;
24
25pub const LOG_LEVEL_EMERGENCY: LOG_LEVEL = 0;
26pub const LOG_LEVEL_ALERT: LOG_LEVEL = 1;
27pub const LOG_LEVEL_CRITICAL: LOG_LEVEL = 2;
28pub const LOG_LEVEL_ERROR: LOG_LEVEL = 3;
29pub const LOG_LEVEL_WARNING: LOG_LEVEL = 4;
30pub const LOG_LEVEL_NOTICE: LOG_LEVEL = 5;
31pub const LOG_LEVEL_INFO: LOG_LEVEL = 6;
32pub const LOG_LEVEL_DEBUG: LOG_LEVEL = 7;
33pub const LOG_LEVEL_MAX: LOG_LEVEL = 7;
34
35pub const LOG_TYPE_SYSTEM: i32 = 0;
36pub const LOG_TYPE_OPERATION: i32 = 1;
37pub const LOG_TYPE_SECURITY: i32 = 2;
38
39pub const LOG_DEF_PERSISTENT_LEVEL: LOG_LEVEL    = LOG_LEVEL_WARNING; //日志默认存盘的级别
40pub const LOG_DEF_CONSOLE_OUTPUT_LEVEL: LOG_LEVEL  = LOG_LEVEL_INFO;
41
42pub const LOG_DEF_SERVICE_PORT: u16 = 61000;
43pub const SYSLOG_DEF_UDP_PORT:u16=512;
44
45/*内部传递的Log数据结构*/
46#[derive(Clone, Serialize, Deserialize)]
47pub struct InnerLogMsg {
48    ModuleName: String,
49	#[serde(with = "time::serde::rfc3339")]
50    OccureTime: common::rsm_time_t,
51    LogType: i32,
52    LogLevel: LOG_LEVEL,
53    Position: String,
54    ErrCode: errcode::RESULT,
55    LogDesc: String,
56    Context: String,
57}
58
59///Log listener, for application want to implement specific log storage & report function
60pub trait LogListener {
61	fn NotifyLog(&mut self,msg:&InnerLogMsg, msgSeq:u64)->bool;
62}
63
64pub (crate) fn LogFormat(msgStru:&InnerLogMsg, msgId:u64,sender:&SocketAddr)->String {
65	/*SYSLOG-MSG = HEADER SP STRUCTURED-DATA [SP MSG]
66	  HEADER = PRI VERSION SP TIMESTAMP SP HOSTNAME
67	  SP APP-NAME SP PROCID SP MSGID */
68	let strHdr = format!("<{}> 1 {} {}:{} {} {} {}", 1*8+msgStru.LogLevel, 
69        msgStru.OccureTime.to_string(),
70		sender.ip(), sender.port(), msgStru.ModuleName, msgStru.Position, msgId);
71
72	let strMsg = format!("{} ErrorCode=\"{}\" {} {}\n", strHdr,msgStru.ErrCode, msgStru.LogDesc, msgStru.Context);
73	return strMsg
74}
75
76#[derive(Clone,Debug)]
77pub struct log_client_t {
78	persistent_log_level:LOG_LEVEL,
79	console_log_level:LOG_LEVEL,
80	name:String, //客户模块的名字
81	addr:SocketAddr,
82	logPackets:u64,
83    logBytes:u64,  //发送Log的条数和
84}
85
86const LOG_DEF_STORAGE_SIZE:u64=2*1024*1024;
87const LOG_DEF_PATH:&str = "./";
88const LOG_DEF_PREFIX:&str = "rsm_xlog";
89
90///configuration for create a log service
91#[derive(Clone,Debug,Serialize, Deserialize)]
92pub struct log_service_config_t {
93	pub persistentLevel:LOG_LEVEL,
94	pub consoleLevel:LOG_LEVEL,
95	///max log file disk size, unit is bytes, if the stored log file exceed this value, then the file will be tuncated
96	pub maxStorageSize:u64, 
97	pub maxStoragePeriod:i32,  
98    pub logFilePath:String,
99	pub logFilePrefix:String, 
100    pub self_addr:SocketAddr,
101	pub syslog_server:Option<SocketAddr>,
102}
103impl log_service_config_t {
104	pub fn new_default()->Self {
105		let def_addr=SocketAddr::new(IpAddr::from([127,0,0,1]),LOG_DEF_SERVICE_PORT);
106		return Self { persistentLevel:LOG_LEVEL_ERROR, consoleLevel: LOG_LEVEL_ERROR, 
107			maxStorageSize: LOG_DEF_STORAGE_SIZE, maxStoragePeriod: 2, 
108			logFilePath:LOG_DEF_PATH.to_string(), logFilePrefix: LOG_DEF_PREFIX.to_string(), 
109			self_addr:def_addr, syslog_server: None 
110		}
111	}
112}
113
114///log service should be create only once in any system
115pub struct log_service_t  {
116    service_conf:log_service_config_t,
117	sck:UdpSocket,
118	curLogFile:Option<fs::File>, 
119	unSyncedMsg:i32,      
120	lastWriteTime:common::datetime_t,
121	queue:VecDeque<String>,
122	logMuduleControl:TsHashMap<String,log_client_t>, 
123	logModuleIndex:TsHashMap<SocketAddr,String>,  
124	logListener:TsHashMap<String,*mut dyn LogListener>,
125	logPackets:u64, 
126    LogBytes:u64,
127	logSeq:u64, //Log Msg序列号
128	stdout:io::Stdout,
129	sys_client:Option<sys_log_client_t>,
130}
131
132///Log client instance, should be created before using log function
133pub struct xlogger_t {
134    module_name: String,
135    self_ip: IpAddr,
136    self_port: u16,
137    server_addr: SocketAddr,
138    socket: Option<UdpSocket>,
139    level: LOG_LEVEL,
140    sentPackets: u64,
141    sentbytes: u64,
142}
143
144
145pub(crate) fn compressFile(fileIn:&String, fileOut:&String)->errcode::RESULT {
146	let mut fp1 = match fs::OpenOptions::new().read(true).open(fileIn) {
147		Err(_)=>return errcode::ERROR_OPEN_FILE,
148		Ok(f)=>f,
149	};
150
151	
152	let mut fp2 = match fs::OpenOptions::new().create_new(true).write(true).open(fileOut) {
153		Err(_)=>return errcode::ERROR_OPEN_FILE,
154		Ok(f)=>f,
155	};
156	let stats = match fp1.metadata() {
157		Err(_)=>return errcode::ERROR_OPEN_FILE,
158		Ok(s)=>s,
159	};
160	let complvl=libdeflater::CompressionLvl::default();
161	let mut comp =libdeflater::Compressor::new(complvl);
162	comp.gzip_compress_bound(stats.len() as usize);
163	let mut vec_buf_in = Vec::with_capacity(stats.len() as usize);
164	let mut vec_buf_out = Vec::with_capacity(stats.len() as usize);
165	unsafe {
166		vec_buf_in.set_len(stats.len() as usize);
167		vec_buf_out.set_len(stats.len() as usize);
168	}
169	let n_bytes = match fp1.read(vec_buf_in.as_mut_slice()) {
170		Err(_)=>return errcode::ERROR_BUFFER_TOO_SMALL,
171		Ok(l)=>l,
172	};
173	let comp_len = match comp.gzip_compress(&vec_buf_in.as_slice()[0..n_bytes], vec_buf_out.as_mut_slice()) {
174		Err(_)=>return errcode::ERROR_BUFFER_TOO_SMALL,
175		Ok(l)=>l,		
176	};
177
178	fp2.write(&vec_buf_out.as_slice()[0..comp_len]);
179	fp2.flush();
180
181	return errcode::RESULT_SUCCESS
182
183}
184