1use crate::{
2 config::SinkConfigTrait,
3 log_impl::{LogSink, LogSinkTrait},
4 time::Timer,
5};
6use crossfire::*;
7use log::{Level, Record};
8use std::hash::{Hash, Hasher};
9use std::io::{BufWriter, Error, ErrorKind, Result, Write};
10use std::net::{TcpStream, ToSocketAddrs, UdpSocket};
11use std::os::unix::net::{UnixDatagram, UnixStream};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Once};
14use std::thread;
15use std::time::{Duration, Instant};
16pub use syslog::Facility;
17use syslog::{Formatter3164, LogFormat as SyslogFormat, LoggerBackend as SyslogBackend, Severity};
18
19const TIMEOUT_DEFAULT: Duration = Duration::from_secs(5);
20const UNIX_SOCK_PATHS: [&str; 3] = ["/dev/log", "/var/run/syslog", "/var/run/log"];
21const LOCAL_TCP: &'static str = "127.0.0.1:601";
24
25#[allow(dead_code)]
26const LOCAL_UDP: &'static str = "127.0.0.1:514";
27
28#[derive(Hash)]
29pub enum SyslogProto {
30 RFC3164,
31}
32#[derive(Hash, Clone, Debug)]
36pub enum SyslogAddr {
37 TCP(String),
39 UDP(String, String),
41 Unix(PathBuf),
43}
44
45pub struct Syslog {
85 pub facility: Facility,
87 pub process: Option<String>,
89 pub hostname: Option<String>,
91 pub level: Level,
93 pub proto: SyslogProto,
95 pub server: Option<SyslogAddr>,
97 pub timeout: Duration,
99}
100
101impl Hash for Syslog {
102 fn hash<H: Hasher>(&self, hasher: &mut H) {
103 hasher.write_u32(self.facility as u32);
104 self.process.hash(hasher);
105 self.hostname.hash(hasher);
106 self.level.hash(hasher);
107 self.proto.hash(hasher);
108 self.timeout.hash(hasher);
109 self.server.hash(hasher);
110 }
111}
112
113impl Default for Syslog {
114 fn default() -> Self {
115 Self {
116 proto: SyslogProto::RFC3164,
117 facility: Facility::LOG_USER,
118 process: None,
119 hostname: None,
120 level: Level::Trace,
121 timeout: TIMEOUT_DEFAULT,
122 server: None,
123 }
124 }
125}
126
127impl Syslog {
128 pub fn new(facility: Facility, level: Level) -> Self {
129 let mut s = Self::default();
130 s.proto = SyslogProto::RFC3164;
131 s.facility = facility;
132 s.level = level;
133 s
134 }
135
136 pub fn timeout(mut self, d: Duration) -> Self {
137 self.timeout = d;
138 self
139 }
140
141 pub fn hostname(mut self, name: String) -> Self {
143 self.hostname = Some(name);
144 self
145 }
146
147 pub fn process_name(mut self, name: String) -> Self {
149 self.process = Some(name);
150 self
151 }
152
153 pub fn unix<P: Into<PathBuf>>(mut self, p: P) -> Self {
154 self.server = Some(SyslogAddr::Unix(p.into()));
155 self
156 }
157
158 pub fn tcp<S: AsRef<str>>(mut self, remote: S) -> Self {
159 self.server = Some(SyslogAddr::TCP(remote.as_ref().to_string()));
160 self
161 }
162
163 pub fn udp<S: AsRef<str>>(mut self, local: S, remote: S) -> Self {
164 self.server =
165 Some(SyslogAddr::UDP(local.as_ref().to_string(), remote.as_ref().to_string()));
166 self
167 }
168}
169
170impl SinkConfigTrait for Syslog {
171 fn get_level(&self) -> Level {
172 self.level
173 }
174
175 fn get_file_path(&self) -> Option<Box<Path>> {
176 None
177 }
178
179 fn write_hash(&self, hasher: &mut Box<dyn Hasher>) {
180 self.hash(hasher);
181 hasher.write(b"Syslog");
182 }
183
184 fn build(&self) -> LogSink {
185 LogSink::Syslog(LogSinkSyslog::new(self))
186 }
187}
188
189enum Msg {
190 Line(Vec<u8>),
191 Flush(Arc<Once>),
192}
193
194pub(crate) struct LogSinkSyslog {
195 tx: MTx<Msg>,
196 format: Formatter3164,
197 max_level: Level,
198}
199
200impl LogSinkSyslog {
201 fn new(config: &Syslog) -> Self {
202 let (tx, rx) = mpsc::bounded_blocking(100);
203
204 macro_rules! fill_format {
205 ($f: expr, $config: expr) => {{
206 $f.facility = $config.facility;
207 if $config.server.is_some() {
208 if let Some(hostname) = &$config.hostname {
210 $f.hostname = Some(hostname.clone());
211 }
212 } else {
213 $f.hostname = None;
215 }
216 if let Some(process) = &$config.process {
217 $f.process = process.clone();
218 }
219 }};
220 }
221 let mut timeout = config.timeout;
222 if timeout == Duration::from_secs(0) {
223 timeout = TIMEOUT_DEFAULT;
224 }
225 let mut backend = Backend { server: config.server.clone(), timeout, writer: None };
226 let _ = backend.reinit();
227
228 let mut f = Formatter3164::default();
229 fill_format!(f, config);
230 thread::spawn(move || backend.run(rx));
231 Self { tx, max_level: config.level, format: f }
232 }
233}
234
235impl LogSinkTrait for LogSinkSyslog {
236 fn reopen(&self) -> std::io::Result<()> {
237 Ok(())
238 }
239
240 #[inline(always)]
241 fn log(&self, _now: &Timer, r: &Record) {
242 let l = r.level();
243 if r.level() <= self.max_level {
244 let mut buf = Vec::with_capacity(128);
245 let _level = match l {
246 Level::Trace => Severity::LOG_DEBUG, Level::Debug => Severity::LOG_DEBUG,
248 Level::Info => Severity::LOG_INFO,
249 Level::Warn => Severity::LOG_WARNING,
250 Level::Error => Severity::LOG_ERR,
251 };
252 let msg = format!("{}", r.args());
253 self.format.format(&mut buf, _level, msg).expect("format");
254 let _ = self.tx.send(Msg::Line(buf));
255 }
256 }
257
258 #[inline(always)]
259 fn flush(&self) {
260 let o = Arc::new(Once::new());
261 if self.tx.send(Msg::Flush(o.clone())).is_ok() {
262 o.wait();
263 }
264 }
265}
266
267struct Backend {
268 server: Option<SyslogAddr>,
269 writer: Option<SyslogBackend>,
270 timeout: Duration,
271}
272
273impl Backend {
274 #[inline]
275 fn connect_unix(path: &Path) -> Result<SyslogBackend> {
276 let sock = UnixDatagram::unbound()?;
277 match sock.connect(Path::new(path)) {
278 Ok(()) => {
279 println!("syslog: connect to unix {:?}", path);
280 return Ok(SyslogBackend::Unix(sock));
281 }
282 Err(e) => {
283 if e.raw_os_error() == Some(libc::EPROTOTYPE) {
284 let sock = UnixStream::connect(path)?;
285 println!("syslog: connect to unix {:?}", path);
286 return Ok(SyslogBackend::UnixStream(BufWriter::new(sock)));
287 }
288 return Err(e);
289 }
290 }
291 }
292
293 #[inline]
294 fn connect_tcp(s: &str, timeout: Duration) -> Result<SyslogBackend> {
295 for addr in s.to_socket_addrs()? {
296 let socket = TcpStream::connect_timeout(&addr, timeout)?;
297 return Ok(SyslogBackend::Tcp(BufWriter::new(socket)));
298 }
299 Err(Error::new(ErrorKind::NotFound, "syslog: no server address").into())
300 }
301
302 #[inline]
303 fn connect_udp(local: &str, remote: &str) -> Result<SyslogBackend> {
304 let server_addr = remote.to_socket_addrs().and_then(|mut server_addr_opt| {
305 server_addr_opt
306 .next()
307 .ok_or_else(|| Error::new(ErrorKind::NotFound, "syslog: no server address").into())
308 })?;
309 println!("syslog: connect to udp {:?}", remote);
310 let socket = UdpSocket::bind(local)?;
311 return Ok(SyslogBackend::Udp(socket, server_addr));
312 }
313
314 fn connect(server: &Option<SyslogAddr>, timeout: Duration) -> Result<SyslogBackend> {
315 match server {
316 Some(SyslogAddr::Unix(p)) => Self::connect_unix(p.as_path()),
317 Some(SyslogAddr::UDP(local, remote)) => Self::connect_udp(&local, &remote),
318 Some(SyslogAddr::TCP(remote)) => Self::connect_tcp(&remote, timeout),
319 None => {
320 for p in &UNIX_SOCK_PATHS {
321 if let Ok(backend) = Self::connect_unix(Path::new(p)) {
322 return Ok(backend);
323 }
324 }
325 return Self::connect_tcp(LOCAL_TCP, timeout);
326 }
330 }
331 }
332
333 #[inline(always)]
334 fn reinit(&mut self) -> Result<()> {
335 match Self::connect(&self.server, self.timeout) {
336 Err(e) => {
337 eprintln!("syslog: connect {:?} err {:?}", e, self.server);
338 return Err(e);
339 }
340 Ok(backend) => {
341 self.writer = Some(backend);
342 Ok(())
343 }
344 }
345 }
346
347 #[inline(always)]
348 fn flush(&mut self) {
349 if let Some(writer) = self.writer.as_mut() {
350 if let Err(e) = writer.flush() {
351 eprintln!("syslog: flush err {:?}", e);
352 self.writer = None;
353 }
354 }
355 }
356
357 #[inline]
358 fn write(&mut self, msg: &[u8]) {
359 if let Some(writer) = self.writer.as_mut() {
360 match writer.write_all(msg) {
361 Ok(_) => return,
362 Err(e) => {
363 eprintln!("syslog: write err {:?}", e);
364 self.writer = None;
365 }
366 }
367 }
368 let start_ts = Instant::now();
369 loop {
370 thread::sleep(Duration::from_millis(500));
371 if self.reinit().is_ok() {
372 if let Some(writer) = self.writer.as_mut() {
373 match writer.write_all(msg) {
374 Ok(_) => return,
375 Err(e) => {
376 eprintln!("syslog: write err {:?}", e);
377 self.writer = None;
378 }
379 }
380 }
381 }
382 if Instant::now().duration_since(start_ts) > self.timeout {
383 return;
385 }
386 }
387 }
388
389 fn run(&mut self, rx: Rx<Msg>) {
390 loop {
391 match rx.recv() {
392 Ok(Msg::Line(_msg)) => {
393 self.write(&_msg);
394 let mut need_flush = true;
395 while let Ok(msg) = rx.try_recv() {
396 match msg {
397 Msg::Line(_msg) => self.write(&_msg),
398 Msg::Flush(o) => {
399 self.flush();
400 o.call_once(|| {});
401 need_flush = false;
402 }
403 }
404 }
405 if need_flush {
406 self.flush();
407 }
408 }
409 Ok(Msg::Flush(o)) => {
410 self.flush();
411 o.call_once(|| {});
412 }
413 Err(_) => {
414 self.flush();
415 return;
417 }
418 }
419 }
420 }
421}