captains_log/
syslog.rs

1//! # Syslog support
2//!
3//! Config for syslog output, supports local and remote server.
4//!
5//! The underlayer protocol is implemented by [syslog](https://docs.rs/syslog) crate,
6//! currently Formatter3164 is adapted.
7//!
8//! In order to achieve efficient socket I/O, the message is sent to channel,
9//! and asynchronous flushed by backend writer.
10//!
11//! **When your program shutting down, should call flush to ensure the log is written to the socket.**
12//!
13//! ``` rust
14//! log::logger().flush();
15//! ```
16//! On panic, our panic hook will call `flush()` explicitly.
17//!
18//! On connection, will output "syslog connected" message to stdout.
19//!
20//! On remote syslog server failure, will not panic, only "syslog: flush err" message will be print
21//! to stderr, the backend thread will automatically reconnect to server.
22//! In order to prevent hang up, the message will be dropped after a timeout.
23//!
24//! ## connect to local server
25//!
26//! ``` rust
27//! use captains_log::{*, syslog::Facility};
28//! recipe::syslog_local(Facility::LOG_USER, Level::Debug).build().expect("log_setup");
29//! ```
30//! ## connect to remote server
31//!
32//! ``` rust
33//! use captains_log::{*, syslog::*};
34//! let syslog = Syslog::new(Facility::LOG_USER, Level::Info).tcp("10.10.0.1:601");
35//! let _ = Builder::default().add_sink(syslog).build();
36//! ```
37
38use crate::{
39    config::{SinkConfigBuild, SinkConfigTrait},
40    log_impl::{LogSink, LogSinkTrait},
41    time::Timer,
42};
43use crossfire::*;
44use log::{Level, Record};
45use std::hash::{Hash, Hasher};
46use std::io::{BufWriter, Error, ErrorKind, Result, Write};
47use std::net::{TcpStream, ToSocketAddrs, UdpSocket};
48use std::os::unix::net::{UnixDatagram, UnixStream};
49use std::path::{Path, PathBuf};
50use std::sync::{Arc, Once};
51use std::thread;
52use std::time::{Duration, Instant};
53pub use syslog::Facility;
54use syslog::{Formatter3164, LogFormat as SyslogFormat, LoggerBackend as SyslogBackend, Severity};
55
56const TIMEOUT_DEFAULT: Duration = Duration::from_secs(5);
57const UNIX_SOCK_PATHS: [&str; 3] = ["/dev/log", "/var/run/syslog", "/var/run/log"];
58// NOTE: local /dev/log is always available
59
60const LOCAL_TCP: &'static str = "127.0.0.1:601";
61
62#[allow(dead_code)]
63const LOCAL_UDP: &'static str = "127.0.0.1:514";
64
65#[derive(Hash)]
66pub enum SyslogProto {
67    RFC3164,
68}
69// NOTE the document of syslog crate does not tell much how to adapt Formatter5424 to log crate,
70// so we only support 3164 for now.
71
72#[derive(Hash, Clone, Debug)]
73pub enum SyslogAddr {
74    /// remote server addr
75    TCP(String),
76    /// local socket addr and remote server addr
77    UDP(String, String),
78    /// Unix with specified path
79    Unix(PathBuf),
80}
81
82/// Config for syslog sink, see [module level doc](crate::syslog) for usage:
83pub struct Syslog {
84    /// Syslog facility
85    pub facility: Facility,
86    /// Auto filled current process
87    pub process: Option<String>,
88    /// Auto filled localhost,
89    pub hostname: Option<String>,
90    /// max level of message goes to syslog
91    pub level: Level,
92    /// When in doubt, use RFC3164
93    pub proto: SyslogProto,
94    /// When None, connect local default unix socket.
95    pub server: Option<SyslogAddr>,
96    /// Drop msg when syslog server fail after a timeout, also apply to tcp connect timeout.
97    pub timeout: Duration,
98}
99
100impl Hash for Syslog {
101    fn hash<H: Hasher>(&self, hasher: &mut H) {
102        hasher.write_u32(self.facility as u32);
103        self.process.hash(hasher);
104        self.hostname.hash(hasher);
105        self.level.hash(hasher);
106        self.proto.hash(hasher);
107        self.timeout.hash(hasher);
108        self.server.hash(hasher);
109    }
110}
111
112impl Default for Syslog {
113    fn default() -> Self {
114        Self {
115            proto: SyslogProto::RFC3164,
116            facility: Facility::LOG_USER,
117            process: None,
118            hostname: None,
119            level: Level::Trace,
120            timeout: TIMEOUT_DEFAULT,
121            server: None,
122        }
123    }
124}
125
126impl Syslog {
127    pub fn new(facility: Facility, level: Level) -> Self {
128        let mut s = Self::default();
129        s.proto = SyslogProto::RFC3164;
130        s.facility = facility;
131        s.level = level;
132        s
133    }
134
135    pub fn timeout(mut self, d: Duration) -> Self {
136        self.timeout = d;
137        self
138    }
139
140    /// Set hostname if you don't want the default
141    pub fn hostname(mut self, name: String) -> Self {
142        self.hostname = Some(name);
143        self
144    }
145
146    /// Set process name if you don't want the default
147    pub fn process_name(mut self, name: String) -> Self {
148        self.process = Some(name);
149        self
150    }
151
152    pub fn unix<P: Into<PathBuf>>(mut self, p: P) -> Self {
153        self.server = Some(SyslogAddr::Unix(p.into()));
154        self
155    }
156
157    pub fn tcp<S: AsRef<str>>(mut self, remote: S) -> Self {
158        self.server = Some(SyslogAddr::TCP(remote.as_ref().to_string()));
159        self
160    }
161
162    pub fn udp<S: AsRef<str>>(mut self, local: S, remote: S) -> Self {
163        self.server =
164            Some(SyslogAddr::UDP(local.as_ref().to_string(), remote.as_ref().to_string()));
165        self
166    }
167}
168
169impl SinkConfigBuild for Syslog {
170    fn build(&self) -> LogSink {
171        LogSink::Syslog(LogSinkSyslog::new(self))
172    }
173}
174
175impl SinkConfigTrait for Syslog {
176    fn get_level(&self) -> Level {
177        self.level
178    }
179
180    fn get_file_path(&self) -> Option<Box<Path>> {
181        None
182    }
183
184    fn write_hash(&self, hasher: &mut Box<dyn Hasher>) {
185        self.hash(hasher);
186        hasher.write(b"Syslog");
187    }
188}
189
190enum Msg {
191    Line(Vec<u8>),
192    Flush(Arc<Once>),
193}
194
195pub(crate) struct LogSinkSyslog {
196    tx: MTx<Msg>,
197    format: Formatter3164,
198    max_level: Level,
199}
200
201impl LogSinkSyslog {
202    fn new(config: &Syslog) -> Self {
203        let (tx, rx) = mpsc::bounded_blocking(100);
204
205        macro_rules! fill_format {
206            ($f: expr, $config: expr) => {{
207                $f.facility = $config.facility;
208                if $config.server.is_some() {
209                    // remote
210                    if let Some(hostname) = &$config.hostname {
211                        $f.hostname = Some(hostname.clone());
212                    }
213                } else {
214                    // local don't need hostname
215                    $f.hostname = None;
216                }
217                if let Some(process) = &$config.process {
218                    $f.process = process.clone();
219                }
220            }};
221        }
222        let mut timeout = config.timeout;
223        if timeout == Duration::from_secs(0) {
224            timeout = TIMEOUT_DEFAULT;
225        }
226        let mut backend = Backend { server: config.server.clone(), timeout, writer: None };
227        let _ = backend.reinit();
228
229        let mut f = Formatter3164::default();
230        fill_format!(f, config);
231        thread::spawn(move || backend.run(rx));
232        Self { tx, max_level: config.level, format: f }
233    }
234}
235
236impl LogSinkTrait for LogSinkSyslog {
237    fn open(&self) -> std::io::Result<()> {
238        Ok(())
239    }
240
241    fn reopen(&self) -> std::io::Result<()> {
242        Ok(())
243    }
244
245    #[inline(always)]
246    fn log(&self, _now: &Timer, r: &Record) {
247        let l = r.level();
248        if r.level() <= self.max_level {
249            let mut buf = Vec::with_capacity(128);
250            let _level = match l {
251                Level::Trace => Severity::LOG_DEBUG, // syslog don't have trace level
252                Level::Debug => Severity::LOG_DEBUG,
253                Level::Info => Severity::LOG_INFO,
254                Level::Warn => Severity::LOG_WARNING,
255                Level::Error => Severity::LOG_ERR,
256            };
257            let msg = format!("{}", r.args());
258            self.format.format(&mut buf, _level, msg).expect("format");
259            let _ = self.tx.send(Msg::Line(buf));
260        }
261    }
262
263    #[inline(always)]
264    fn flush(&self) {
265        let o = Arc::new(Once::new());
266        if self.tx.send(Msg::Flush(o.clone())).is_ok() {
267            o.wait();
268        }
269    }
270}
271
272struct Backend {
273    server: Option<SyslogAddr>,
274    writer: Option<SyslogBackend>,
275    timeout: Duration,
276}
277
278impl Backend {
279    #[inline]
280    fn connect_unix(path: &Path) -> Result<SyslogBackend> {
281        let sock = UnixDatagram::unbound()?;
282        match sock.connect(Path::new(path)) {
283            Ok(()) => {
284                println!("syslog: connect to unix {:?}", path);
285                return Ok(SyslogBackend::Unix(sock));
286            }
287            Err(e) => {
288                if e.raw_os_error() == Some(libc::EPROTOTYPE) {
289                    let sock = UnixStream::connect(path)?;
290                    println!("syslog: connect to unix {:?}", path);
291                    return Ok(SyslogBackend::UnixStream(BufWriter::new(sock)));
292                }
293                return Err(e);
294            }
295        }
296    }
297
298    #[inline]
299    fn connect_tcp(s: &str, timeout: Duration) -> Result<SyslogBackend> {
300        for addr in s.to_socket_addrs()? {
301            let socket = TcpStream::connect_timeout(&addr, timeout)?;
302            return Ok(SyslogBackend::Tcp(BufWriter::new(socket)));
303        }
304        Err(Error::new(ErrorKind::NotFound, "syslog: no server address").into())
305    }
306
307    #[inline]
308    fn connect_udp(local: &str, remote: &str) -> Result<SyslogBackend> {
309        let server_addr = remote.to_socket_addrs().and_then(|mut server_addr_opt| {
310            server_addr_opt
311                .next()
312                .ok_or_else(|| Error::new(ErrorKind::NotFound, "syslog: no server address").into())
313        })?;
314        println!("syslog: connect to udp {:?}", remote);
315        let socket = UdpSocket::bind(local)?;
316        return Ok(SyslogBackend::Udp(socket, server_addr));
317    }
318
319    fn connect(server: &Option<SyslogAddr>, timeout: Duration) -> Result<SyslogBackend> {
320        match server {
321            Some(SyslogAddr::Unix(p)) => Self::connect_unix(p.as_path()),
322            Some(SyslogAddr::UDP(local, remote)) => Self::connect_udp(&local, &remote),
323            Some(SyslogAddr::TCP(remote)) => Self::connect_tcp(&remote, timeout),
324            None => {
325                for p in &UNIX_SOCK_PATHS {
326                    if let Ok(backend) = Self::connect_unix(Path::new(p)) {
327                        return Ok(backend);
328                    }
329                }
330                return Self::connect_tcp(LOCAL_TCP, timeout);
331                // Self::connect_udp("127.0.0.1:0", "127.0.0.1:514")
332                // XXX: do not connect local udp unless specified by user,
333                // since we have no means to detect udp failure
334            }
335        }
336    }
337
338    #[inline(always)]
339    fn reinit(&mut self) -> Result<()> {
340        match Self::connect(&self.server, self.timeout) {
341            Err(e) => {
342                eprintln!("syslog: connect {:?} err {:?}", e, self.server);
343                return Err(e);
344            }
345            Ok(backend) => {
346                self.writer = Some(backend);
347                Ok(())
348            }
349        }
350    }
351
352    #[inline(always)]
353    fn flush(&mut self) {
354        if let Some(writer) = self.writer.as_mut() {
355            if let Err(e) = writer.flush() {
356                eprintln!("syslog: flush err {:?}", e);
357                self.writer = None;
358            }
359        }
360    }
361
362    #[inline]
363    fn write(&mut self, msg: &[u8]) {
364        if let Some(writer) = self.writer.as_mut() {
365            match writer.write_all(msg) {
366                Ok(_) => return,
367                Err(e) => {
368                    eprintln!("syslog: write err {:?}", e);
369                    self.writer = None;
370                }
371            }
372        }
373        let start_ts = Instant::now();
374        loop {
375            thread::sleep(Duration::from_millis(500));
376            if self.reinit().is_ok() {
377                if let Some(writer) = self.writer.as_mut() {
378                    match writer.write_all(msg) {
379                        Ok(_) => return,
380                        Err(e) => {
381                            eprintln!("syslog: write err {:?}", e);
382                            self.writer = None;
383                        }
384                    }
385                }
386            }
387            if Instant::now().duration_since(start_ts) > self.timeout {
388                // give up
389                return;
390            }
391        }
392    }
393
394    fn run(&mut self, rx: Rx<Msg>) {
395        loop {
396            match rx.recv() {
397                Ok(Msg::Line(_msg)) => {
398                    self.write(&_msg);
399                    let mut need_flush = true;
400                    while let Ok(msg) = rx.try_recv() {
401                        match msg {
402                            Msg::Line(_msg) => self.write(&_msg),
403                            Msg::Flush(o) => {
404                                self.flush();
405                                o.call_once(|| {});
406                                need_flush = false;
407                            }
408                        }
409                    }
410                    if need_flush {
411                        self.flush();
412                    }
413                }
414                Ok(Msg::Flush(o)) => {
415                    self.flush();
416                    o.call_once(|| {});
417                }
418                Err(_) => {
419                    self.flush();
420                    // exit
421                    return;
422                }
423            }
424        }
425    }
426}