1#![allow(non_camel_case_types)]
2#![allow(non_snake_case)]
3#![allow(non_upper_case_globals)]
4#![allow(dead_code)]
5
6use super::*;
7use crate::common::{errcode,tsmap::TsHashMap};
8use std::net::{SocketAddr,UdpSocket,IpAddr};
9use std::time::{self,Duration,SystemTime};
10use std::fs::{self,OpenOptions};
11use std::collections::{VecDeque};
12use std::io::{self,Read,Write};
13use std::thread;
14
15const MAX_LOG_QUEUE_LEN:usize = 2048;
16const MAX_LOG_MSG_LEN:usize = 65000;
17const MAX_OOB_MSG_LEN:usize = 4096;
18const MAX_UNSYNCED_MSG:i32 = 256;
19const LOG_SYNC_DISK_PERIOD:u64 = 2; const MAX_LOG_FILE:usize = 365;
21
22impl log_service_t {
23 pub fn new(conf:&log_service_config_t)->Option<log_service_t> {
24 let sck=match UdpSocket::bind(conf.self_addr) {
25 Err(_)=>return None,
26 Ok(s)=>s,
27 };
28
29 let mut service = Self {
30 service_conf:conf.clone(),
31 sck:sck,
32 curLogFile:None,
33 unSyncedMsg:0, lastWriteTime:common::get_datetime_from_std(&SystemTime::now()),
35 queue:VecDeque::with_capacity(MAX_LOG_QUEUE_LEN),
36 logMuduleControl:TsHashMap::new(128), logModuleIndex:TsHashMap::new(128), logListener:TsHashMap::new(128),
39 logPackets:0,
40 LogBytes:0,
41 logSeq:1,
42 stdout:io::stdout(),
43 sys_client:None,
44 };
45 if let Some(syslog_addr) = conf.syslog_server {
46 if let Ok(mut syslog)=syslog::sys_log_client_t::new(&conf.self_addr) {
47 syslog.set_server_addr(&syslog_addr);
48 service.sys_client=Some(syslog);
49 }
50
51 }
52
53 return Some(service)
54 }
55 pub fn openLogFile(&mut self)->errcode::RESULT {
57 let tm = common::get_datetime_from_std(&SystemTime::now());
58
59 let fileName = format!("{}{}",self.service_conf.logFilePath, self.getLogFileName(&tm));
60
61 let fd = match OpenOptions::new().create(true).write(true).append(true).open(fileName.clone()) {
62 Ok(f)=>f,
63 Err(e)=>{
64 println!("open file {} error,err={}",fileName,e);
65 return errcode::ERROR_OPEN_FILE;
66 },
67 };
68 self.curLogFile=Some(fd);
69 return errcode::RESULT_SUCCESS
70 }
71
72 fn getLogFileName(&self,tm:&common::datetime_t)->String {
74 let fileName = format!("{}_{:#04}{:#02}{:#02}.log", self.service_conf.logFilePrefix,
75 tm.get_year(), tm.get_mon_in_year(), tm.get_day_in_mon());
76 return fileName
77 }
78
79 pub fn GetLogFilePath(&self)->String {
81 return self.service_conf.logFilePath.clone()
82 }
83
84 pub fn init(&mut self) {
86
87 }
88 fn SetGlobalPersitentLogLevel(&mut self,newLevel:LOG_LEVEL) {
90 self.service_conf.persistentLevel = newLevel;
91
92 }
93
94 fn SetGlobalConsoleLogLevel(&mut self,newLevel:LOG_LEVEL) {
95 self.service_conf.consoleLevel = newLevel;
96 }
97
98fn SetModulePersitentLogLevel(&mut self,name:&String, newLevel:LOG_LEVEL)->errcode::RESULT {
100 if let Some(module) = self.logMuduleControl.get_mut(name) {
101 module.persistent_log_level = newLevel;
102 } else {
103 let module = log_client_t{
104 name: name.clone(),
105 persistent_log_level:newLevel,
106 console_log_level: LOG_DEF_CONSOLE_OUTPUT_LEVEL,
107 addr:SocketAddr::new(IpAddr::from([127,0,0,1]),0),
108 logBytes:0,
109 logPackets:0,
110 };
111 self.logMuduleControl.insert(name.clone(),module);
112 }
113 return errcode::RESULT_SUCCESS
114
115 }
116
117fn SetModuleConsoleLogLevel(&mut self,name:&String, newLevel:LOG_LEVEL)->errcode::RESULT {
119 if let Some(module) = self.logMuduleControl.get_mut(name) {
120 module.console_log_level = newLevel;
121 } else {
122 let module = log_client_t{
123 name: name.clone(),
124 persistent_log_level:LOG_DEF_PERSISTENT_LEVEL,
125 console_log_level: newLevel,
126 addr:SocketAddr::new(IpAddr::from([127,0,0,1]),0),
127 logBytes:0,
128 logPackets:0,
129 };
130 self.logMuduleControl.insert(name.clone(),module);
131 }
132
133 return errcode::RESULT_SUCCESS
134 }
135
136 fn SetLogPersitentParam(&mut self,maxDiskSize:u64, maxPeriod:i32) {
138 self.service_conf.maxStorageSize = maxDiskSize;
139
140 if maxPeriod >= 0 {
141 self.service_conf.maxStoragePeriod = maxPeriod;
142 }
143 }
144
145 pub fn set_syslog_addr(&mut self,addr:&SocketAddr)->errcode::RESULT {
146 if let Some(log)=&mut self.sys_client {
147 log.set_server_addr(addr);
148 return errcode::RESULT_SUCCESS;
149 }
150 return errcode::ERROR_NOT_INITIALIZED;
151 }
152
153 fn RegisterLogModule<'a>(&'a mut self,name:&String, addr:&SocketAddr)->Option<&'a mut log_client_t> {
155 if self.logMuduleControl.contains_key(name) {
156 match self.logMuduleControl.get_mut(name) {
157 None=>return None,
158 Some(m)=> {
159 if m.addr.port() == 0 {
160 self.logModuleIndex.insert(addr.clone(),name.clone());
161 m.addr=addr.clone();
162 }
163 return Some(m);
164 },
165 }
166 }
167
168 self.logModuleIndex.insert(addr.clone(),name.clone());
169 let module=log_client_t{
170 persistent_log_level: LOG_LEVEL_WARNING, console_log_level: LOG_LEVEL_INFO,
171 name: name.clone(), addr:addr.clone(),
172 logBytes:0,
173 logPackets:0,};
174 self.logMuduleControl.insert(name.clone(),module);
175 return self.logMuduleControl.get_mut(name);
176
177 }
178 fn processListener(&mut self,msgStru:&InnerLogMsg, msgSeq:u64)->bool {
180 for (_, v) in self.logListener.iter_mut() {
181 let callback = unsafe {&mut *(*v)};
182 let ret = callback.NotifyLog(msgStru, msgSeq);
183 if !ret {
184 return false;
185 }
186 }
187 self.logListener.end_iter();
188 return true
189 }
190
191 pub fn recvLog(&mut self) {
194 println!("LogServer: Begin Receiving Log Message");
195 let _ = self.sck.set_read_timeout(Some(Duration::from_millis(50)));
196 let mut recv_buf=[0u8;MAX_LOG_MSG_LEN];
197 loop {
198 let (len,addr)=match self.sck.recv_from(&mut recv_buf[..]) {
199 Err(_)=>continue,
200 Ok((l,a))=>(l,a),
201 };
202 self.logPackets+=1;
204 self.LogBytes+=len as u64;
205 let logStru = match serde_json::from_slice::<InnerLogMsg>(&recv_buf[0..len]) {
206 Err(_e)=> {
207 println!("log server decode message err {},len={}",_e,len);
208 continue;
209 },
210 Ok(d)=>d,
211 };
212 self.processLog(&logStru, &recv_buf[0..len], &addr);
213
214 }
215
216 }
217
218 fn innerOutputLog(&mut self,msg:&InnerLogMsg, m:&mut log_client_t,formated_msg:&String) {
220 self.persistentLog(msg,m, formated_msg); self.consoleOutputLog(msg, m, formated_msg); }
223
224 fn processLog(&mut self,msg:&InnerLogMsg,origin_msg:&[u8],sender:&SocketAddr) {
226 self.logSeq+=1;
227 let ret = self.processListener(msg, self.logSeq);
228 if !ret {
229 return
230 }
231 let seq = self.logSeq;
232 let mut client = match self.RegisterLogModule(&msg.ModuleName, sender) {
234 None=>return,
235 Some(m)=> m,
236 };
237
238 client.logPackets+=1;
239 client.logBytes += origin_msg.len() as u64;
240
241 let c= unsafe {&mut *(client as *mut log_client_t)};
243 let strMsg = LogFormat(msg,seq,sender);
244 self.innerOutputLog(msg, c,&strMsg);
245 }
246
247
248
249 fn flushLogFile(&mut self) {
251
252 if self.unSyncedMsg <= 0 {
253 return
254 }
255 let cur = common::get_now_usec64();
256
257 if self.unSyncedMsg >= MAX_UNSYNCED_MSG ||
258 cur>=self.lastWriteTime.to_usecs()+1000*1000*LOG_SYNC_DISK_PERIOD {
259 self.forceSyncLogFile();
260 }
261
262 }
263
264fn persistentLog(&mut self,msgStru:&InnerLogMsg, m:&log_client_t, formated_msg:&String)->errcode::RESULT {
266 if msgStru.LogLevel > self.service_conf.persistentLevel || msgStru.LogLevel > m.persistent_log_level {
267 return errcode::ERROR_NO_OP;
268 }
269
270 let cur =common::get_datetime_from_std(&time::SystemTime::now());
271 let last = self.lastWriteTime.clone();
272 if (cur.get_day_in_year() != last.get_day_in_year()) || (cur.get_year()!= last.get_year()) {
274 self.forceSyncLogFile();
275
276 let err = self.openLogFile();
278 if err != errcode::RESULT_SUCCESS {
279 return err
280 }
281 }
282
283 self.write_to_file(formated_msg.as_bytes());
284 self.unSyncedMsg+=1;
285 self.flushLogFile();
287 self.lastWriteTime = common::get_datetime_from_std(&time::SystemTime::now());
288 return errcode::RESULT_SUCCESS
289}
290
291 fn write_to_file(&mut self,buf:&[u8])->errcode::RESULT {
292 if let Some(ref mut f) = &mut self.curLogFile {
293 let _ = f.write(buf);
294 return errcode::RESULT_SUCCESS
295 } else {
296 return errcode::ERROR_OPEN_FILE;
297 }
298 }
299fn forceSyncLogFile(&mut self) {
301 let file=match &mut self.curLogFile {
303 None=>return,
304 Some(f)=>f,
305 };
306 if let Ok(stats) = file.metadata() {
307 if stats.len()>self.service_conf.maxStorageSize {
308 let _ = file.set_len(self.service_conf.maxStorageSize);
309
310 }
311 }
312 let _ = file.sync_all();
313 self.unSyncedMsg = 0;
314
315}
316
317 fn consoleOutputLog(&mut self,msgStru:&InnerLogMsg, m:&log_client_t, formated_msg:&String) {
319 if msgStru.LogLevel > self.service_conf.consoleLevel || msgStru.LogLevel > m.console_log_level {
320 return
321 }
322 let _ = self.stdout.write(formated_msg.as_bytes());
323 self.send_to_syslog_server(msgStru, formated_msg);
324 }
325
326 fn send_to_syslog_server(&mut self,msgStru:&InnerLogMsg, formated_msg:&String) {
327 if let Some(syslog)=&mut self.sys_client {
329 syslog.send_encoded_msg(formated_msg);
330 }
331 }
332 pub fn RegisterListener(&mut self,name:&String, listener:&'static mut dyn LogListener)->errcode::RESULT {
334
335 if self.logListener.contains_key(name) {
336 return errcode::ERROR_ALREADY_EXIST;
337 }
338 self.logListener.insert(name.clone(),listener as *mut dyn LogListener);
339 return errcode::RESULT_SUCCESS
340 }
341
342 pub fn DeregisterListener(&mut self,name:&String)->errcode::RESULT {
343 if !self.logListener.contains_key(name) {
344 return errcode::ERROR_NOT_FOUND;
345 }
346 self.logListener.remove(name);
347 return errcode::RESULT_SUCCESS
348 }
349
350
351 fn do_clean(&mut self) {
352 let cur = time::SystemTime::now();
353 let mut toBeClean=false;
354 let mut total_file_size =0u64;
355 println!("[LogClean Task]Begin Log Clean Task");
356 for i in 1..MAX_LOG_FILE {
357
358 let tm = common::get_datetime_from_std(&cur.checked_sub(Duration::from_secs(3600*24*i as u64)).unwrap());
359 let f1 = self.getLogFileName(&tm);
360 let f2 = format!("{}{}",self.GetLogFilePath(),f1);
361 let ziped = format!("{}{}",f2, ".zip");
362 if toBeClean {
363 let _ = fs::remove_file(ziped);
364 let _ = fs::remove_file(f2);
365 continue
366 }
367
368 if errcode::RESULT_SUCCESS == compressFile(&f2, &ziped) {
369 let _ = fs::remove_file(f2);
370 };
371
372
373 match fs::metadata(ziped) {
374 Err(_)=>continue,
375 Ok(m)=>{
376 total_file_size+=m.len();
377 if total_file_size>self.service_conf.maxStorageSize || i>self.service_conf.maxStoragePeriod as usize {
378 toBeClean = true;
379 }
380 },
381 }
382 }
383 }
384
385 pub fn PrintLogServiceStats(&self) {
386 println!("LogService: Ip={},port={},SysLogServer={:?}\n",
387 self.service_conf.self_addr.ip(), self.service_conf.self_addr.port(),
388 self.service_conf.syslog_server);
389
390 println!("LogService: PersitentLevel={}, ConsoleLevel={}, max_disk_size={}Bytes,max_Period={} Days\n",
391 self.service_conf.persistentLevel,
392 self.service_conf.consoleLevel, self.service_conf.maxStorageSize, self.service_conf.maxStoragePeriod);
393
394 println!("LogService: Recv Log Packets={}, bytes={},queue_cap={},len={}\n",
395 self.logPackets, self.LogBytes,self.queue.capacity(),self.queue.len());
396
397 println!("--------------Log Module----------------");
398 for (_, v) in self.logMuduleControl.iter() {
399 println!("ModuleName={}, \taddr={}:{}, PersitentLevel={}, ConsoleLevel={},recv_packets={},bytes={}\n",
400 v.name,v.addr.ip(), v.addr.port(), v.persistent_log_level,
401 v.console_log_level, v.logPackets, v.logBytes);
402 }
403 self.logMuduleControl.end_iter();
404 }
405
406
407}
408
409static mut gLogServer:Option<log_service_t>=None;
410pub fn InitLogService(conf:&log_service_config_t) ->errcode::RESULT {
412 unsafe {
413 if gLogServer.is_none() {
414 gLogServer=log_service_t::new(conf);
415 }
416 }
417 let service=match unsafe {&mut gLogServer} {
418 None=>{
419 return errcode::ERROR_INIT_FAILED;
420 },
421 Some(s)=>s,
422 };
423 service.service_conf.logFilePath=formatLogPath(&service.service_conf.logFilePath);
424
425 let ret = service.openLogFile();
426 if ret != errcode::RESULT_SUCCESS {
427 return ret
428 }
429 std::thread::spawn(||run_log_service());
430 InitLogCleanTask(); return errcode::RESULT_SUCCESS
433}
434
435fn run_log_service() {
436 let service=match unsafe {&mut gLogServer} {
437 None=>{
438 return ;
439 },
440 Some(s)=>s,
441 };
442
443 loop {
444 service.recvLog();
445 }
446}
447
448
449pub fn RegisterListener(name:&String, listener:&'static mut dyn LogListener)->errcode::RESULT {
451 let service=match unsafe {&mut gLogServer} {
452 None=>{
453 return errcode::ERROR_INIT_FAILED;
454 },
455 Some(s)=>s,
456 };
457
458 return service.RegisterListener(name,listener);
459}
460
461fn DeregisterListener(name:&String)->errcode::RESULT {
463 let service=match unsafe {&mut gLogServer} {
464 None=>{
465 return errcode::ERROR_INIT_FAILED;
466 },
467 Some(s)=>s,
468 };
469
470 return service.DeregisterListener(name);
471}
472
473pub fn set_syslog_addr(addr:&SocketAddr)->errcode::RESULT {
474 let service=match unsafe {&mut gLogServer} {
475 None=>{
476 return errcode::ERROR_INIT_FAILED;
477 },
478 Some(s)=>s,
479 };
480
481 return service.set_syslog_addr(addr);
482}
483fn formatLogPath(filePath:&String)->String {
485 let mut new_str = filePath.trim().to_string();
486
487 if new_str.len() == 0 {
488 new_str = "/var/log/".to_string();
489 } else {
490 if !new_str.ends_with("/") {
491 new_str +="/";
492 }
493 }
494 return new_str;
495}
496
497pub(crate) fn InitLogCleanTask() {
498 std::thread::spawn(|| LogCleanTask()); }
500
501fn LogCleanTask() {
504 let log_serv = match unsafe{&mut gLogServer} {
505 None=>return,
506 Some(s)=>s,
507 };
508 loop {
509 thread::sleep(time::Duration::from_secs(3600));
510 log_serv.do_clean();
511 }
512}