captains_log/
syslog.rs

1use crate::{
2    config::{SinkConfigBuild, SinkConfigTrait},
3    log_impl::{LogSink, LogSinkTrait},
4    time::Timer,
5};
6use crossfire::*;
7use log::{Level, Record};
8use std::hash::{Hash, Hasher};
9use std::io::{BufWriter, Error, ErrorKind, Result, Write};
10use std::net::{TcpStream, ToSocketAddrs, UdpSocket};
11use std::os::unix::net::{UnixDatagram, UnixStream};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Once};
14use std::thread;
15use std::time::{Duration, Instant};
16pub use syslog::Facility;
17use syslog::{Formatter3164, LogFormat as SyslogFormat, LoggerBackend as SyslogBackend, Severity};
18
19const TIMEOUT_DEFAULT: Duration = Duration::from_secs(5);
20const UNIX_SOCK_PATHS: [&str; 3] = ["/dev/log", "/var/run/syslog", "/var/run/log"];
21// NOTE: local /dev/log is always available
22
23const LOCAL_TCP: &'static str = "127.0.0.1:601";
24
25#[allow(dead_code)]
26const LOCAL_UDP: &'static str = "127.0.0.1:514";
27
28#[derive(Hash)]
29pub enum SyslogProto {
30    RFC3164,
31}
32// NOTE the document of syslog crate does not tell much how to adapt Formatter5424 to log crate,
33// so we only support 3164 for now.
34
35#[derive(Hash, Clone, Debug)]
36pub enum SyslogAddr {
37    /// remote server addr
38    TCP(String),
39    /// local socket addr and remote server addr
40    UDP(String, String),
41    /// Unix with specified path
42    Unix(PathBuf),
43}
44
45/// Config for syslog output, supports local and remote server.
46///
47/// The underlayer protocol is implemented by [syslog](https://docs.rs/syslog) crate,
48/// currently Formatter3164 is adapted.
49///
50/// In order to achieve efficient socket I/O, the message is sent to channel,
51/// and asynchronous flushed by backend writer.
52///
53/// **When your program shutting down, should call flush to ensure the log is written to the socket.**
54///
55/// ``` rust
56/// log::logger().flush();
57/// ```
58/// On panic, our panic hook will call `flush()` explicitly.
59///
60/// On connection, will output "syslog connected" message to stdout.
61///
62/// On remote syslog server failure, will not panic, only "syslog: flush err" message will be print
63/// to stderr, the backend thread will automatically reconnect to server.
64/// In order to prevent hang up, the message will be dropped after a timeout.
65///
66/// # Example connecting local server
67///
68/// Source of [crate::recipe::syslog_local()]
69///
70/// ``` rust
71/// use captains_log::*;
72/// pub fn syslog_local(max_level: Level) -> Builder {
73///     let syslog = Syslog::new(Facility::LOG_USER, max_level);
74///     return Builder::default().add_sink(syslog);
75/// }
76/// ```
77/// # Example connecting remote server
78///
79/// ``` rust
80/// use captains_log::*;
81/// let syslog = Syslog::new(Facility::LOG_USER, Level::Info).tcp("10.10.0.1:601");
82/// let _ = Builder::default().add_sink(syslog).build();
83/// ```
84pub struct Syslog {
85    /// Syslog facility
86    pub facility: Facility,
87    /// Auto filled current process
88    pub process: Option<String>,
89    /// Auto filled localhost,
90    pub hostname: Option<String>,
91    /// max level of message goes to syslog
92    pub level: Level,
93    /// When in doubt, use RFC3164
94    pub proto: SyslogProto,
95    /// When None, connect local default unix socket.
96    pub server: Option<SyslogAddr>,
97    /// Drop msg when syslog server fail after a timeout, also apply to tcp connect timeout.
98    pub timeout: Duration,
99}
100
101impl Hash for Syslog {
102    fn hash<H: Hasher>(&self, hasher: &mut H) {
103        hasher.write_u32(self.facility as u32);
104        self.process.hash(hasher);
105        self.hostname.hash(hasher);
106        self.level.hash(hasher);
107        self.proto.hash(hasher);
108        self.timeout.hash(hasher);
109        self.server.hash(hasher);
110    }
111}
112
113impl Default for Syslog {
114    fn default() -> Self {
115        Self {
116            proto: SyslogProto::RFC3164,
117            facility: Facility::LOG_USER,
118            process: None,
119            hostname: None,
120            level: Level::Trace,
121            timeout: TIMEOUT_DEFAULT,
122            server: None,
123        }
124    }
125}
126
127impl Syslog {
128    pub fn new(facility: Facility, level: Level) -> Self {
129        let mut s = Self::default();
130        s.proto = SyslogProto::RFC3164;
131        s.facility = facility;
132        s.level = level;
133        s
134    }
135
136    pub fn timeout(mut self, d: Duration) -> Self {
137        self.timeout = d;
138        self
139    }
140
141    /// Set hostname if you don't want the default
142    pub fn hostname(mut self, name: String) -> Self {
143        self.hostname = Some(name);
144        self
145    }
146
147    /// Set process name if you don't want the default
148    pub fn process_name(mut self, name: String) -> Self {
149        self.process = Some(name);
150        self
151    }
152
153    pub fn unix<P: Into<PathBuf>>(mut self, p: P) -> Self {
154        self.server = Some(SyslogAddr::Unix(p.into()));
155        self
156    }
157
158    pub fn tcp<S: AsRef<str>>(mut self, remote: S) -> Self {
159        self.server = Some(SyslogAddr::TCP(remote.as_ref().to_string()));
160        self
161    }
162
163    pub fn udp<S: AsRef<str>>(mut self, local: S, remote: S) -> Self {
164        self.server =
165            Some(SyslogAddr::UDP(local.as_ref().to_string(), remote.as_ref().to_string()));
166        self
167    }
168}
169
170impl SinkConfigBuild for Syslog {
171    fn build(&self) -> LogSink {
172        LogSink::Syslog(LogSinkSyslog::new(self))
173    }
174}
175
176impl SinkConfigTrait for Syslog {
177    fn get_level(&self) -> Level {
178        self.level
179    }
180
181    fn get_file_path(&self) -> Option<Box<Path>> {
182        None
183    }
184
185    fn write_hash(&self, hasher: &mut Box<dyn Hasher>) {
186        self.hash(hasher);
187        hasher.write(b"Syslog");
188    }
189}
190
191enum Msg {
192    Line(Vec<u8>),
193    Flush(Arc<Once>),
194}
195
196pub(crate) struct LogSinkSyslog {
197    tx: MTx<Msg>,
198    format: Formatter3164,
199    max_level: Level,
200}
201
202impl LogSinkSyslog {
203    fn new(config: &Syslog) -> Self {
204        let (tx, rx) = mpsc::bounded_blocking(100);
205
206        macro_rules! fill_format {
207            ($f: expr, $config: expr) => {{
208                $f.facility = $config.facility;
209                if $config.server.is_some() {
210                    // remote
211                    if let Some(hostname) = &$config.hostname {
212                        $f.hostname = Some(hostname.clone());
213                    }
214                } else {
215                    // local don't need hostname
216                    $f.hostname = None;
217                }
218                if let Some(process) = &$config.process {
219                    $f.process = process.clone();
220                }
221            }};
222        }
223        let mut timeout = config.timeout;
224        if timeout == Duration::from_secs(0) {
225            timeout = TIMEOUT_DEFAULT;
226        }
227        let mut backend = Backend { server: config.server.clone(), timeout, writer: None };
228        let _ = backend.reinit();
229
230        let mut f = Formatter3164::default();
231        fill_format!(f, config);
232        thread::spawn(move || backend.run(rx));
233        Self { tx, max_level: config.level, format: f }
234    }
235}
236
237impl LogSinkTrait for LogSinkSyslog {
238    fn open(&self) -> std::io::Result<()> {
239        Ok(())
240    }
241
242    fn reopen(&self) -> std::io::Result<()> {
243        Ok(())
244    }
245
246    #[inline(always)]
247    fn log(&self, _now: &Timer, r: &Record) {
248        let l = r.level();
249        if r.level() <= self.max_level {
250            let mut buf = Vec::with_capacity(128);
251            let _level = match l {
252                Level::Trace => Severity::LOG_DEBUG, // syslog don't have trace level
253                Level::Debug => Severity::LOG_DEBUG,
254                Level::Info => Severity::LOG_INFO,
255                Level::Warn => Severity::LOG_WARNING,
256                Level::Error => Severity::LOG_ERR,
257            };
258            let msg = format!("{}", r.args());
259            self.format.format(&mut buf, _level, msg).expect("format");
260            let _ = self.tx.send(Msg::Line(buf));
261        }
262    }
263
264    #[inline(always)]
265    fn flush(&self) {
266        let o = Arc::new(Once::new());
267        if self.tx.send(Msg::Flush(o.clone())).is_ok() {
268            o.wait();
269        }
270    }
271}
272
273struct Backend {
274    server: Option<SyslogAddr>,
275    writer: Option<SyslogBackend>,
276    timeout: Duration,
277}
278
279impl Backend {
280    #[inline]
281    fn connect_unix(path: &Path) -> Result<SyslogBackend> {
282        let sock = UnixDatagram::unbound()?;
283        match sock.connect(Path::new(path)) {
284            Ok(()) => {
285                println!("syslog: connect to unix {:?}", path);
286                return Ok(SyslogBackend::Unix(sock));
287            }
288            Err(e) => {
289                if e.raw_os_error() == Some(libc::EPROTOTYPE) {
290                    let sock = UnixStream::connect(path)?;
291                    println!("syslog: connect to unix {:?}", path);
292                    return Ok(SyslogBackend::UnixStream(BufWriter::new(sock)));
293                }
294                return Err(e);
295            }
296        }
297    }
298
299    #[inline]
300    fn connect_tcp(s: &str, timeout: Duration) -> Result<SyslogBackend> {
301        for addr in s.to_socket_addrs()? {
302            let socket = TcpStream::connect_timeout(&addr, timeout)?;
303            return Ok(SyslogBackend::Tcp(BufWriter::new(socket)));
304        }
305        Err(Error::new(ErrorKind::NotFound, "syslog: no server address").into())
306    }
307
308    #[inline]
309    fn connect_udp(local: &str, remote: &str) -> Result<SyslogBackend> {
310        let server_addr = remote.to_socket_addrs().and_then(|mut server_addr_opt| {
311            server_addr_opt
312                .next()
313                .ok_or_else(|| Error::new(ErrorKind::NotFound, "syslog: no server address").into())
314        })?;
315        println!("syslog: connect to udp {:?}", remote);
316        let socket = UdpSocket::bind(local)?;
317        return Ok(SyslogBackend::Udp(socket, server_addr));
318    }
319
320    fn connect(server: &Option<SyslogAddr>, timeout: Duration) -> Result<SyslogBackend> {
321        match server {
322            Some(SyslogAddr::Unix(p)) => Self::connect_unix(p.as_path()),
323            Some(SyslogAddr::UDP(local, remote)) => Self::connect_udp(&local, &remote),
324            Some(SyslogAddr::TCP(remote)) => Self::connect_tcp(&remote, timeout),
325            None => {
326                for p in &UNIX_SOCK_PATHS {
327                    if let Ok(backend) = Self::connect_unix(Path::new(p)) {
328                        return Ok(backend);
329                    }
330                }
331                return Self::connect_tcp(LOCAL_TCP, timeout);
332                // Self::connect_udp("127.0.0.1:0", "127.0.0.1:514")
333                // XXX: do not connect local udp unless specified by user,
334                // since we have no means to detect udp failure
335            }
336        }
337    }
338
339    #[inline(always)]
340    fn reinit(&mut self) -> Result<()> {
341        match Self::connect(&self.server, self.timeout) {
342            Err(e) => {
343                eprintln!("syslog: connect {:?} err {:?}", e, self.server);
344                return Err(e);
345            }
346            Ok(backend) => {
347                self.writer = Some(backend);
348                Ok(())
349            }
350        }
351    }
352
353    #[inline(always)]
354    fn flush(&mut self) {
355        if let Some(writer) = self.writer.as_mut() {
356            if let Err(e) = writer.flush() {
357                eprintln!("syslog: flush err {:?}", e);
358                self.writer = None;
359            }
360        }
361    }
362
363    #[inline]
364    fn write(&mut self, msg: &[u8]) {
365        if let Some(writer) = self.writer.as_mut() {
366            match writer.write_all(msg) {
367                Ok(_) => return,
368                Err(e) => {
369                    eprintln!("syslog: write err {:?}", e);
370                    self.writer = None;
371                }
372            }
373        }
374        let start_ts = Instant::now();
375        loop {
376            thread::sleep(Duration::from_millis(500));
377            if self.reinit().is_ok() {
378                if let Some(writer) = self.writer.as_mut() {
379                    match writer.write_all(msg) {
380                        Ok(_) => return,
381                        Err(e) => {
382                            eprintln!("syslog: write err {:?}", e);
383                            self.writer = None;
384                        }
385                    }
386                }
387            }
388            if Instant::now().duration_since(start_ts) > self.timeout {
389                // give up
390                return;
391            }
392        }
393    }
394
395    fn run(&mut self, rx: Rx<Msg>) {
396        loop {
397            match rx.recv() {
398                Ok(Msg::Line(_msg)) => {
399                    self.write(&_msg);
400                    let mut need_flush = true;
401                    while let Ok(msg) = rx.try_recv() {
402                        match msg {
403                            Msg::Line(_msg) => self.write(&_msg),
404                            Msg::Flush(o) => {
405                                self.flush();
406                                o.call_once(|| {});
407                                need_flush = false;
408                            }
409                        }
410                    }
411                    if need_flush {
412                        self.flush();
413                    }
414                }
415                Ok(Msg::Flush(o)) => {
416                    self.flush();
417                    o.call_once(|| {});
418                }
419                Err(_) => {
420                    self.flush();
421                    // exit
422                    return;
423                }
424            }
425        }
426    }
427}