captains_log/
syslog.rs

1use crate::{
2    config::SinkConfigTrait,
3    log_impl::{LogSink, LogSinkTrait},
4    time::Timer,
5};
6use crossfire::*;
7use log::{Level, Record};
8use std::hash::{Hash, Hasher};
9use std::io::{BufWriter, Error, ErrorKind, Result, Write};
10use std::net::{TcpStream, ToSocketAddrs, UdpSocket};
11use std::os::unix::net::{UnixDatagram, UnixStream};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Once};
14use std::thread;
15use std::time::{Duration, Instant};
16pub use syslog::Facility;
17use syslog::{Formatter3164, LogFormat as SyslogFormat, LoggerBackend as SyslogBackend, Severity};
18
19const TIMEOUT_DEFAULT: Duration = Duration::from_secs(5);
20const UNIX_SOCK_PATHS: [&str; 3] = ["/dev/log", "/var/run/syslog", "/var/run/log"];
21// NOTE: local /dev/log is always available
22
23const LOCAL_TCP: &'static str = "127.0.0.1:601";
24
25#[allow(dead_code)]
26const LOCAL_UDP: &'static str = "127.0.0.1:514";
27
28#[derive(Hash)]
29pub enum SyslogProto {
30    RFC3164,
31}
32// NOTE the document of syslog crate does not tell much how to adapt Formatter5424 to log crate,
33// so we only support 3164 for now.
34
35#[derive(Hash, Clone, Debug)]
36pub enum SyslogAddr {
37    /// remote server addr
38    TCP(String),
39    /// local socket addr and remote server addr
40    UDP(String, String),
41    /// Unix with specified path
42    Unix(PathBuf),
43}
44
45/// Config for syslog output, supports local and remote server.
46///
47/// The underlayer protocol is implemented by [syslog](https://docs.rs/syslog) crate,
48/// currently Formatter3164 is adapted.
49///
50/// In order to achieve efficient socket I/O, the message is sent to channel,
51/// and asynchronous flushed by backend writer.
52///
53/// **When your program shutting down, should call flush to ensure the log is written to the socket.**
54///
55/// ``` rust
56/// log::logger().flush();
57/// ```
58/// On panic, our panic hook will call `flush()` explicitly.
59///
60/// On connection, will output "syslog connected" message to stdout.
61///
62/// On remote syslog server failure, will not panic, only "syslog: flush err" message will be print
63/// to stderr, the backend thread will automatically reconnect to server.
64/// In order to prevent hang up, the message will be dropped after a timeout.
65///
66/// # Example connecting local server
67///
68/// Source of [crate::recipe::syslog_local()]
69///
70/// ``` rust
71/// use captains_log::*;
72/// pub fn syslog_local(max_level: Level) -> Builder {
73///     let syslog = Syslog::new(Facility::LOG_USER, max_level);
74///     return Builder::default().syslog(syslog);
75/// }
76/// ```
77/// # Example connecting remote server
78///
79/// ``` rust
80/// use captains_log::*;
81/// let syslog = Syslog::new(Facility::LOG_USER, Level::Info).tcp("10.10.0.1:601");
82/// let _ = Builder::default().syslog(syslog).build();
83/// ```
84pub struct Syslog {
85    /// Syslog facility
86    pub facility: Facility,
87    /// Auto filled current process
88    pub process: Option<String>,
89    /// Auto filled localhost,
90    pub hostname: Option<String>,
91    /// max level of message goes to syslog
92    pub level: Level,
93    /// When in doubt, use RFC3164
94    pub proto: SyslogProto,
95    /// When None, connect local default unix socket.
96    pub server: Option<SyslogAddr>,
97    /// Drop msg when syslog server fail after a timeout, also apply to tcp connect timeout.
98    pub timeout: Duration,
99}
100
101impl Hash for Syslog {
102    fn hash<H: Hasher>(&self, hasher: &mut H) {
103        hasher.write_u32(self.facility as u32);
104        self.process.hash(hasher);
105        self.hostname.hash(hasher);
106        self.level.hash(hasher);
107        self.proto.hash(hasher);
108        self.timeout.hash(hasher);
109        self.server.hash(hasher);
110    }
111}
112
113impl Default for Syslog {
114    fn default() -> Self {
115        Self {
116            proto: SyslogProto::RFC3164,
117            facility: Facility::LOG_USER,
118            process: None,
119            hostname: None,
120            level: Level::Trace,
121            timeout: TIMEOUT_DEFAULT,
122            server: None,
123        }
124    }
125}
126
127impl Syslog {
128    pub fn new(facility: Facility, level: Level) -> Self {
129        let mut s = Self::default();
130        s.proto = SyslogProto::RFC3164;
131        s.facility = facility;
132        s.level = level;
133        s
134    }
135
136    pub fn timeout(mut self, d: Duration) -> Self {
137        self.timeout = d;
138        self
139    }
140
141    /// Set hostname if you don't want the default
142    pub fn hostname(mut self, name: String) -> Self {
143        self.hostname = Some(name);
144        self
145    }
146
147    /// Set process name if you don't want the default
148    pub fn process_name(mut self, name: String) -> Self {
149        self.process = Some(name);
150        self
151    }
152
153    pub fn unix<P: Into<PathBuf>>(mut self, p: P) -> Self {
154        self.server = Some(SyslogAddr::Unix(p.into()));
155        self
156    }
157
158    pub fn tcp<S: AsRef<str>>(mut self, remote: S) -> Self {
159        self.server = Some(SyslogAddr::TCP(remote.as_ref().to_string()));
160        self
161    }
162
163    pub fn udp<S: AsRef<str>>(mut self, local: S, remote: S) -> Self {
164        self.server =
165            Some(SyslogAddr::UDP(local.as_ref().to_string(), remote.as_ref().to_string()));
166        self
167    }
168}
169
170impl SinkConfigTrait for Syslog {
171    fn get_level(&self) -> Level {
172        self.level
173    }
174
175    fn get_file_path(&self) -> Option<Box<Path>> {
176        None
177    }
178
179    fn write_hash(&self, hasher: &mut Box<dyn Hasher>) {
180        self.hash(hasher);
181        hasher.write(b"Syslog");
182    }
183
184    fn build(&self) -> LogSink {
185        LogSink::Syslog(LogSinkSyslog::new(self))
186    }
187}
188
189enum Msg {
190    Line(Vec<u8>),
191    Flush(Arc<Once>),
192}
193
194pub(crate) struct LogSinkSyslog {
195    tx: MTx<Msg>,
196    format: Formatter3164,
197    max_level: Level,
198}
199
200impl LogSinkSyslog {
201    fn new(config: &Syslog) -> Self {
202        let (tx, rx) = mpsc::bounded_blocking(100);
203
204        macro_rules! fill_format {
205            ($f: expr, $config: expr) => {{
206                $f.facility = $config.facility;
207                if $config.server.is_some() {
208                    // remote
209                    if let Some(hostname) = &$config.hostname {
210                        $f.hostname = Some(hostname.clone());
211                    }
212                } else {
213                    // local don't need hostname
214                    $f.hostname = None;
215                }
216                if let Some(process) = &$config.process {
217                    $f.process = process.clone();
218                }
219            }};
220        }
221        let mut timeout = config.timeout;
222        if timeout == Duration::from_secs(0) {
223            timeout = TIMEOUT_DEFAULT;
224        }
225        let mut backend = Backend { server: config.server.clone(), timeout, writer: None };
226        let _ = backend.reinit();
227
228        let mut f = Formatter3164::default();
229        fill_format!(f, config);
230        thread::spawn(move || backend.run(rx));
231        Self { tx, max_level: config.level, format: f }
232    }
233}
234
235impl LogSinkTrait for LogSinkSyslog {
236    fn reopen(&self) -> std::io::Result<()> {
237        Ok(())
238    }
239
240    #[inline(always)]
241    fn log(&self, _now: &Timer, r: &Record) {
242        let l = r.level();
243        if r.level() <= self.max_level {
244            let mut buf = Vec::with_capacity(128);
245            let _level = match l {
246                Level::Trace => Severity::LOG_DEBUG, // syslog don't have trace level
247                Level::Debug => Severity::LOG_DEBUG,
248                Level::Info => Severity::LOG_INFO,
249                Level::Warn => Severity::LOG_WARNING,
250                Level::Error => Severity::LOG_ERR,
251            };
252            let msg = format!("{}", r.args());
253            self.format.format(&mut buf, _level, msg).expect("format");
254            let _ = self.tx.send(Msg::Line(buf));
255        }
256    }
257
258    #[inline(always)]
259    fn flush(&self) {
260        let o = Arc::new(Once::new());
261        if self.tx.send(Msg::Flush(o.clone())).is_ok() {
262            o.wait();
263        }
264    }
265}
266
267struct Backend {
268    server: Option<SyslogAddr>,
269    writer: Option<SyslogBackend>,
270    timeout: Duration,
271}
272
273impl Backend {
274    #[inline]
275    fn connect_unix(path: &Path) -> Result<SyslogBackend> {
276        let sock = UnixDatagram::unbound()?;
277        match sock.connect(Path::new(path)) {
278            Ok(()) => {
279                println!("syslog: connect to unix {:?}", path);
280                return Ok(SyslogBackend::Unix(sock));
281            }
282            Err(e) => {
283                if e.raw_os_error() == Some(libc::EPROTOTYPE) {
284                    let sock = UnixStream::connect(path)?;
285                    println!("syslog: connect to unix {:?}", path);
286                    return Ok(SyslogBackend::UnixStream(BufWriter::new(sock)));
287                }
288                return Err(e);
289            }
290        }
291    }
292
293    #[inline]
294    fn connect_tcp(s: &str, timeout: Duration) -> Result<SyslogBackend> {
295        for addr in s.to_socket_addrs()? {
296            let socket = TcpStream::connect_timeout(&addr, timeout)?;
297            return Ok(SyslogBackend::Tcp(BufWriter::new(socket)));
298        }
299        Err(Error::new(ErrorKind::NotFound, "syslog: no server address").into())
300    }
301
302    #[inline]
303    fn connect_udp(local: &str, remote: &str) -> Result<SyslogBackend> {
304        let server_addr = remote.to_socket_addrs().and_then(|mut server_addr_opt| {
305            server_addr_opt
306                .next()
307                .ok_or_else(|| Error::new(ErrorKind::NotFound, "syslog: no server address").into())
308        })?;
309        println!("syslog: connect to udp {:?}", remote);
310        let socket = UdpSocket::bind(local)?;
311        return Ok(SyslogBackend::Udp(socket, server_addr));
312    }
313
314    fn connect(server: &Option<SyslogAddr>, timeout: Duration) -> Result<SyslogBackend> {
315        match server {
316            Some(SyslogAddr::Unix(p)) => Self::connect_unix(p.as_path()),
317            Some(SyslogAddr::UDP(local, remote)) => Self::connect_udp(&local, &remote),
318            Some(SyslogAddr::TCP(remote)) => Self::connect_tcp(&remote, timeout),
319            None => {
320                for p in &UNIX_SOCK_PATHS {
321                    if let Ok(backend) = Self::connect_unix(Path::new(p)) {
322                        return Ok(backend);
323                    }
324                }
325                return Self::connect_tcp(LOCAL_TCP, timeout);
326                // Self::connect_udp("127.0.0.1:0", "127.0.0.1:514")
327                // XXX: do not connect local udp unless specified by user,
328                // since we have no means to detect udp failure
329            }
330        }
331    }
332
333    #[inline(always)]
334    fn reinit(&mut self) -> Result<()> {
335        match Self::connect(&self.server, self.timeout) {
336            Err(e) => {
337                eprintln!("syslog: connect {:?} err {:?}", e, self.server);
338                return Err(e);
339            }
340            Ok(backend) => {
341                self.writer = Some(backend);
342                Ok(())
343            }
344        }
345    }
346
347    #[inline(always)]
348    fn flush(&mut self) {
349        if let Some(writer) = self.writer.as_mut() {
350            if let Err(e) = writer.flush() {
351                eprintln!("syslog: flush err {:?}", e);
352                self.writer = None;
353            }
354        }
355    }
356
357    #[inline]
358    fn write(&mut self, msg: &[u8]) {
359        if let Some(writer) = self.writer.as_mut() {
360            match writer.write_all(msg) {
361                Ok(_) => return,
362                Err(e) => {
363                    eprintln!("syslog: write err {:?}", e);
364                    self.writer = None;
365                }
366            }
367        }
368        let start_ts = Instant::now();
369        loop {
370            thread::sleep(Duration::from_millis(500));
371            if self.reinit().is_ok() {
372                if let Some(writer) = self.writer.as_mut() {
373                    match writer.write_all(msg) {
374                        Ok(_) => return,
375                        Err(e) => {
376                            eprintln!("syslog: write err {:?}", e);
377                            self.writer = None;
378                        }
379                    }
380                }
381            }
382            if Instant::now().duration_since(start_ts) > self.timeout {
383                // give up
384                return;
385            }
386        }
387    }
388
389    fn run(&mut self, rx: Rx<Msg>) {
390        loop {
391            match rx.recv() {
392                Ok(Msg::Line(_msg)) => {
393                    self.write(&_msg);
394                    let mut need_flush = true;
395                    while let Ok(msg) = rx.try_recv() {
396                        match msg {
397                            Msg::Line(_msg) => self.write(&_msg),
398                            Msg::Flush(o) => {
399                                self.flush();
400                                o.call_once(|| {});
401                                need_flush = false;
402                            }
403                        }
404                    }
405                    if need_flush {
406                        self.flush();
407                    }
408                }
409                Ok(Msg::Flush(o)) => {
410                    self.flush();
411                    o.call_once(|| {});
412                }
413                Err(_) => {
414                    self.flush();
415                    // exit
416                    return;
417                }
418            }
419        }
420    }
421}