rust_rsm/rsm/xlog/
xlog_server.rs

1#![allow(non_camel_case_types)]
2#![allow(non_snake_case)]
3#![allow(non_upper_case_globals)]
4#![allow(dead_code)]
5
6use super::*;
7use crate::common::{errcode,tsmap::TsHashMap};
8use std::net::{SocketAddr,UdpSocket,IpAddr};
9use std::time::{self,Duration,SystemTime};
10use std::fs::{self,OpenOptions};
11use std::collections::{VecDeque};
12use std::io::{self,Read,Write};
13use std::thread;
14
15const  MAX_LOG_QUEUE_LEN:usize    = 2048;
16const  MAX_LOG_MSG_LEN:usize      = 65000;
17const  MAX_OOB_MSG_LEN:usize      = 4096;
18const  MAX_UNSYNCED_MSG:i32     = 256;
19const  LOG_SYNC_DISK_PERIOD:u64 = 2; //刷新到磁盘的周期
20const MAX_LOG_FILE:usize = 365;
21
22impl log_service_t {
23    pub fn new(conf:&log_service_config_t)->Option<log_service_t> {
24        let sck=match UdpSocket::bind(conf.self_addr) {
25            Err(_)=>return None,
26            Ok(s)=>s,
27        };
28
29        let mut service = Self {
30            service_conf:conf.clone(),
31            sck:sck,
32            curLogFile:None,
33            unSyncedMsg:0,      //已经写入文件,但是没有存盘的消息计数
34            lastWriteTime:common::get_datetime_from_std(&SystemTime::now()),
35            queue:VecDeque::with_capacity(MAX_LOG_QUEUE_LEN),
36            logMuduleControl:TsHashMap::new(128), //以名称索引到日志的客户模块信息
37            logModuleIndex:TsHashMap::new(128),   //以IP:端口索引到名称
38            logListener:TsHashMap::new(128),
39            logPackets:0, 
40            LogBytes:0,
41            logSeq:1,
42			stdout:io::stdout(),
43			sys_client:None,
44        };
45		if let Some(syslog_addr) = conf.syslog_server {
46			if let Ok(mut syslog)=syslog::sys_log_client_t::new(&conf.self_addr) {
47				syslog.set_server_addr(&syslog_addr);
48				service.sys_client=Some(syslog);
49			}
50
51		}
52
53        return Some(service)
54    }
55    /*根据自动生成的文件名,打开日志文件*/
56    pub fn openLogFile(&mut self)->errcode::RESULT {    
57        let tm = common::get_datetime_from_std(&SystemTime::now());
58    
59        let fileName = format!("{}{}",self.service_conf.logFilePath, self.getLogFileName(&tm));
60    
61        let fd = match OpenOptions::new().create(true).write(true).append(true).open(fileName.clone()) {
62            Ok(f)=>f,
63            Err(e)=>{
64				println!("open file {} error,err={}",fileName,e);
65				return errcode::ERROR_OPEN_FILE;
66			},
67        };
68        self.curLogFile=Some(fd);
69        return errcode::RESULT_SUCCESS
70    }
71
72    /*给定一个时间,返回一个规整的日志文件名称,一般为prefix+"_"+YYYYMMDD+".log"*/
73    fn getLogFileName(&self,tm:&common::datetime_t)->String {
74	    let fileName = format!("{}_{:#04}{:#02}{:#02}.log", self.service_conf.logFilePrefix, 
75			tm.get_year(), tm.get_mon_in_year(), tm.get_day_in_mon());
76	    return fileName
77    }
78
79    /*Log Service的内部数据获取*/
80    pub fn GetLogFilePath(&self)->String {
81	    return self.service_conf.logFilePath.clone()
82    }
83
84    ///init log service
85    pub fn init(&mut self) {
86
87    }
88    /*设置全局日志级别,分为存盘级别和控制台输出级别*/
89    fn SetGlobalPersitentLogLevel(&mut self,newLevel:LOG_LEVEL) {
90	    self.service_conf.persistentLevel = newLevel;
91
92    }
93
94    fn SetGlobalConsoleLogLevel(&mut self,newLevel:LOG_LEVEL) {
95	    self.service_conf.consoleLevel = newLevel;
96    }
97
98/*设置模块级的持久化日志级别*/
99    fn SetModulePersitentLogLevel(&mut self,name:&String, newLevel:LOG_LEVEL)->errcode::RESULT {
100		if let Some(module) = self.logMuduleControl.get_mut(name) {
101			module.persistent_log_level = newLevel;
102		} else {
103			let module = log_client_t{
104				name: name.clone(),
105				persistent_log_level:newLevel,
106				console_log_level: LOG_DEF_CONSOLE_OUTPUT_LEVEL,
107				addr:SocketAddr::new(IpAddr::from([127,0,0,1]),0),
108				logBytes:0,
109				logPackets:0,
110			};
111			self.logMuduleControl.insert(name.clone(),module);
112		}
113	    return errcode::RESULT_SUCCESS
114
115    }
116
117/*设置模块级的控制台输出日志级别*/
118    fn SetModuleConsoleLogLevel(&mut self,name:&String, newLevel:LOG_LEVEL)->errcode::RESULT {
119		if let Some(module) = self.logMuduleControl.get_mut(name) {
120			module.console_log_level = newLevel;
121		} else {
122			let module = log_client_t{
123				name: name.clone(),
124				persistent_log_level:LOG_DEF_PERSISTENT_LEVEL,
125				console_log_level: newLevel,
126				addr:SocketAddr::new(IpAddr::from([127,0,0,1]),0),
127				logBytes:0,
128				logPackets:0,
129			};
130			self.logMuduleControl.insert(name.clone(),module);
131		}
132
133	    return errcode::RESULT_SUCCESS
134    }
135
136	/*设置日志存储的参数,0表示不修改*/
137    fn SetLogPersitentParam(&mut self,maxDiskSize:u64, maxPeriod:i32) {
138	    self.service_conf.maxStorageSize = maxDiskSize;
139
140	    if maxPeriod >= 0 {
141		    self.service_conf.maxStoragePeriod = maxPeriod;
142	    }
143    }
144
145	pub fn set_syslog_addr(&mut self,addr:&SocketAddr)->errcode::RESULT {
146		if let Some(log)=&mut self.sys_client {
147			log.set_server_addr(addr);
148			return errcode::RESULT_SUCCESS;
149		}
150		return errcode::ERROR_NOT_INITIALIZED;
151	}
152
153	/*处理监听器的回调,被调用者返回false,则表示流程结束*/
154	fn RegisterLogModule<'a>(&'a mut self,name:&String, addr:&SocketAddr)->Option<&'a mut log_client_t> {
155		if self.logMuduleControl.contains_key(name) {
156			 match self.logMuduleControl.get_mut(name) {
157				None=>return None,
158				Some(m)=> {
159				if m.addr.port() == 0 {				
160					self.logModuleIndex.insert(addr.clone(),name.clone());
161					m.addr=addr.clone();
162				}
163				return Some(m);
164			},
165			}
166		}
167
168		self.logModuleIndex.insert(addr.clone(),name.clone());
169		let module=log_client_t{
170				persistent_log_level: LOG_LEVEL_WARNING, console_log_level: LOG_LEVEL_INFO,
171				name: name.clone(), addr:addr.clone(),
172				logBytes:0,
173				logPackets:0,};
174		self.logMuduleControl.insert(name.clone(),module);
175		return self.logMuduleControl.get_mut(name);
176		
177	}
178	/*处理监听器的回调,被调用者返回false,则表示流程结束*/
179	fn processListener(&mut self,msgStru:&InnerLogMsg, msgSeq:u64)->bool {
180		for (_, v) in self.logListener.iter_mut() {
181			let callback = unsafe {&mut *(*v)};
182			let ret = callback.NotifyLog(msgStru, msgSeq);
183			if !ret {
184				return false;
185			}
186		}
187		self.logListener.end_iter();
188		return true
189	}
190
191	/*日志接收处理,JSON格式
192首先进行回调处理,然后进行存盘处理;最后发送给SysLog Server*/
193	pub fn recvLog(&mut self) {
194		println!("LogServer: Begin Receiving Log Message");
195		let _ = self.sck.set_read_timeout(Some(Duration::from_millis(50)));
196		let mut recv_buf=[0u8;MAX_LOG_MSG_LEN];
197		loop {
198			let (len,addr)=match self.sck.recv_from(&mut recv_buf[..]) {
199				Err(_)=>continue,
200				Ok((l,a))=>(l,a),
201			};
202			//println!("log server recv message from {},len={},msg = {}",addr,len,String::from_utf8_lossy(&recv_buf[0..len]));
203			self.logPackets+=1;
204			self.LogBytes+=len as u64;
205		let logStru = match serde_json::from_slice::<InnerLogMsg>(&recv_buf[0..len]) {
206			Err(_e)=> {
207				println!("log server decode message err {},len={}",_e,len);
208				continue;
209			},
210			Ok(d)=>d,
211		};
212		self.processLog(&logStru, &recv_buf[0..len], &addr);
213
214		}
215
216	}
217
218	/*内部汇总输出日志的函数*/
219	fn innerOutputLog(&mut self,msg:&InnerLogMsg, m:&mut log_client_t,formated_msg:&String) {		
220		self.persistentLog(msg,m, formated_msg); //首先进行持久化处理
221		self.consoleOutputLog(msg, m, formated_msg);    //然后进行控制台处理
222	}
223
224	/*真正的处理Log日志的任务,初始化日志实例时创建线程任务运行*/
225	fn processLog(&mut self,msg:&InnerLogMsg,origin_msg:&[u8],sender:&SocketAddr) {
226			self.logSeq+=1;
227			let ret = self.processListener(msg, self.logSeq);
228			if !ret {
229				return
230			}
231			let seq = self.logSeq;
232			/*每次都调用RegisterLogModule是为解决先在服务端初始化模块级日志,后收到日志的问题*/
233			let mut client = match self.RegisterLogModule(&msg.ModuleName, sender) {
234				None=>return,
235				Some(m)=> m,
236			};
237	
238			client.logPackets+=1;
239			client.logBytes += origin_msg.len() as u64;
240
241			/*输出Log文件*/
242			let c= unsafe {&mut *(client as *mut log_client_t)};
243			let strMsg = LogFormat(msg,seq,sender);
244			self.innerOutputLog(msg, c,&strMsg);
245	}
246
247
248
249	/*周期性日志存盘的操作*/
250	fn flushLogFile(&mut self) {
251	
252		if self.unSyncedMsg <= 0 {
253			return
254		}
255		let cur = common::get_now_usec64();
256
257		if self.unSyncedMsg >= MAX_UNSYNCED_MSG ||
258			cur>=self.lastWriteTime.to_usecs()+1000*1000*LOG_SYNC_DISK_PERIOD {
259			self.forceSyncLogFile();
260		}
261
262	}
263
264/*处理日志持久化流程,首先判断级别是否够*/
265fn persistentLog(&mut self,msgStru:&InnerLogMsg, m:&log_client_t, formated_msg:&String)->errcode::RESULT {
266	if msgStru.LogLevel > self.service_conf.persistentLevel || msgStru.LogLevel > m.persistent_log_level {
267		return errcode::ERROR_NO_OP;
268	}
269
270	let cur =common::get_datetime_from_std(&time::SystemTime::now());
271	let last = self.lastWriteTime.clone();
272	/*已经是另外一天,需要打开新的日志文件*/
273	if (cur.get_day_in_year() != last.get_day_in_year()) || (cur.get_year()!= last.get_year()) {
274		self.forceSyncLogFile();
275
276		/*自动根据当前日期计算一个新的日志文件*/
277		let err = self.openLogFile();
278		if err != errcode::RESULT_SUCCESS {
279			return err
280		}
281	}
282
283	self.write_to_file(formated_msg.as_bytes());
284	self.unSyncedMsg+=1;
285	//println!("[log server]write to disk,seq={},unsynced={},msg_len={}",self.logSeq,self.unSyncedMsg,formated_msg.len());
286	self.flushLogFile();
287	self.lastWriteTime = common::get_datetime_from_std(&time::SystemTime::now());
288	return errcode::RESULT_SUCCESS
289}
290
291	fn write_to_file(&mut self,buf:&[u8])->errcode::RESULT {
292		if let Some(ref mut f) = &mut self.curLogFile {
293			let _ = f.write(buf);
294				return errcode::RESULT_SUCCESS
295		} else {
296			return errcode::ERROR_OPEN_FILE;
297		}
298	}
299/*强制刷新到磁盘文件,将未同步的日志文件,刷新到磁盘*/
300fn forceSyncLogFile(&mut self) {
301	/*假如当前日志超过了存盘的最大尺寸限制,则截断日志大小*/
302	let file=match &mut self.curLogFile {
303		None=>return,
304		Some(f)=>f,
305	};
306	if let Ok(stats) = file.metadata() {
307		if stats.len()>self.service_conf.maxStorageSize {
308			let _ = file.set_len(self.service_conf.maxStorageSize);
309					
310		}
311	}
312	let _ = file.sync_all();
313	self.unSyncedMsg = 0;
314
315}
316
317	/*处理日志控制台输出流程,首先判断日志输出级别是否可以输出*/
318   fn consoleOutputLog(&mut self,msgStru:&InnerLogMsg, m:&log_client_t, formated_msg:&String) {
319		if msgStru.LogLevel > self.service_conf.consoleLevel || msgStru.LogLevel > m.console_log_level {
320			return
321		}
322		let _ = self.stdout.write(formated_msg.as_bytes());	
323		self.send_to_syslog_server(msgStru, formated_msg);
324	}
325
326	fn send_to_syslog_server(&mut self,msgStru:&InnerLogMsg, formated_msg:&String) {
327		//to-do
328		if let Some(syslog)=&mut self.sys_client {
329			syslog.send_encoded_msg(formated_msg);
330		}
331	}
332	/*日志监听器注册,LogService将所有的日志均发送给监听器*/
333	pub fn RegisterListener(&mut self,name:&String, listener:&'static mut dyn LogListener)->errcode::RESULT {
334
335		if self.logListener.contains_key(name) {
336			return errcode::ERROR_ALREADY_EXIST;
337		}
338		self.logListener.insert(name.clone(),listener as *mut dyn LogListener);
339		return errcode::RESULT_SUCCESS
340	}
341
342	pub fn DeregisterListener(&mut self,name:&String)->errcode::RESULT {
343		if !self.logListener.contains_key(name) {
344			return errcode::ERROR_NOT_FOUND;
345		}
346		self.logListener.remove(name);
347		return errcode::RESULT_SUCCESS
348	}
349
350	
351	fn do_clean(&mut self) {
352		let cur = time::SystemTime::now();
353		let mut toBeClean=false;
354		let mut total_file_size =0u64;
355		println!("[LogClean Task]Begin Log Clean Task");
356		for i in 1..MAX_LOG_FILE {
357
358			let tm = common::get_datetime_from_std(&cur.checked_sub(Duration::from_secs(3600*24*i as u64)).unwrap());
359			let f1 = self.getLogFileName(&tm);
360			let f2 = format!("{}{}",self.GetLogFilePath(),f1);
361			let ziped = format!("{}{}",f2, ".zip");
362			if toBeClean {
363				let _ = fs::remove_file(ziped);
364				let _ = fs::remove_file(f2);
365				continue
366			}
367			
368			if errcode::RESULT_SUCCESS == compressFile(&f2, &ziped) {
369					let _ = fs::remove_file(f2);
370			};
371			
372
373		match fs::metadata(ziped) {
374			Err(_)=>continue,
375			Ok(m)=>{
376				total_file_size+=m.len();
377				if total_file_size>self.service_conf.maxStorageSize || i>self.service_conf.maxStoragePeriod as usize {
378					toBeClean = true;
379				}
380			},
381		}
382		}
383	}
384
385    pub fn PrintLogServiceStats(&self) {
386        println!("LogService: Ip={},port={},SysLogServer={:?}\n",
387            self.service_conf.self_addr.ip(), self.service_conf.self_addr.port(), 
388			self.service_conf.syslog_server);
389    
390			println!("LogService: PersitentLevel={}, ConsoleLevel={}, max_disk_size={}Bytes,max_Period={} Days\n", 
391			self.service_conf.persistentLevel,
392            self.service_conf.consoleLevel, self.service_conf.maxStorageSize, self.service_conf.maxStoragePeriod);
393    
394        println!("LogService: Recv Log Packets={}, bytes={},queue_cap={},len={}\n",
395            self.logPackets, self.LogBytes,self.queue.capacity(),self.queue.len());
396    
397		println!("--------------Log Module----------------");
398        for (_, v) in self.logMuduleControl.iter() {
399            println!("ModuleName={}, \taddr={}:{}, PersitentLevel={}, ConsoleLevel={},recv_packets={},bytes={}\n", 
400				v.name,v.addr.ip(), v.addr.port(), v.persistent_log_level,
401				v.console_log_level, v.logPackets, v.logBytes);
402        }
403		self.logMuduleControl.end_iter();
404    }
405    
406
407}
408
409static mut gLogServer:Option<log_service_t>=None;
410/*init log service,parameter is log_service_config_t*/
411pub fn InitLogService(conf:&log_service_config_t) ->errcode::RESULT {
412	unsafe {
413    if gLogServer.is_none() {
414            gLogServer=log_service_t::new(conf);
415    }
416	}
417	let service=match unsafe {&mut gLogServer} {
418        None=>{
419            return errcode::ERROR_INIT_FAILED;
420        },
421        Some(s)=>s,
422    };
423    service.service_conf.logFilePath=formatLogPath(&service.service_conf.logFilePath);
424
425	let ret = service.openLogFile();
426	if ret != errcode::RESULT_SUCCESS {
427		return ret
428	}
429	std::thread::spawn(||run_log_service());
430	InitLogCleanTask(); //初始化清理任务,定期清理任务
431
432	return errcode::RESULT_SUCCESS
433}
434
435fn run_log_service() {
436	let service=match unsafe {&mut gLogServer} {
437        None=>{
438            return ;
439        },
440        Some(s)=>s,
441    };
442
443	loop {
444		service.recvLog();
445	}
446}
447
448
449/*日志监听器注册,LogService将所有的日志均发送给监听器*/
450pub fn RegisterListener(name:&String, listener:&'static mut dyn LogListener)->errcode::RESULT {
451	let service=match unsafe {&mut gLogServer} {
452        None=>{
453            return errcode::ERROR_INIT_FAILED;
454        },
455        Some(s)=>s,
456    };
457	
458	return service.RegisterListener(name,listener);
459}
460
461/*监听器去注册*/
462fn DeregisterListener(name:&String)->errcode::RESULT {
463	let service=match unsafe {&mut gLogServer} {
464        None=>{
465            return errcode::ERROR_INIT_FAILED;
466        },
467        Some(s)=>s,
468    };
469	
470	return service.DeregisterListener(name);
471}
472
473pub fn set_syslog_addr(addr:&SocketAddr)->errcode::RESULT {
474	let service=match unsafe {&mut gLogServer} {
475        None=>{
476            return errcode::ERROR_INIT_FAILED;
477        },
478        Some(s)=>s,
479    };
480	
481	return service.set_syslog_addr(addr);
482}
483/*格式化处理日志目录,包括处理空路径、添加遗失的"/"等操作*/
484fn formatLogPath(filePath:&String)->String {
485	let mut new_str = filePath.trim().to_string();
486
487	if new_str.len() == 0 {
488		new_str = "/var/log/".to_string();
489	} else {
490		if !new_str.ends_with("/") {
491			new_str +="/";
492		}
493	}
494	return new_str;
495}
496
497pub(crate) fn InitLogCleanTask() {
498	std::thread::spawn(|| LogCleanTask()); //日志清理的任务
499}
500
501/*整理日志文件,仅保留最近30天并不超过总容量的日志文件,并且对每天的日志文件进行压缩处理
502对于一天以及以前的文件压缩为.zip文件,并删除原始Log文件*/
503fn LogCleanTask() {
504	let log_serv = match unsafe{&mut gLogServer} {
505		None=>return,
506		Some(s)=>s,
507	};
508	loop {
509		thread::sleep(time::Duration::from_secs(3600));
510		log_serv.do_clean();
511	}
512}