1use crate::{
39 config::{SinkConfigBuild, SinkConfigTrait},
40 log_impl::{LogSink, LogSinkTrait},
41 time::Timer,
42};
43use crossfire::*;
44use log::{Level, Record};
45use std::hash::{Hash, Hasher};
46use std::io::{BufWriter, Error, ErrorKind, Result, Write};
47use std::net::{TcpStream, ToSocketAddrs, UdpSocket};
48use std::os::unix::net::{UnixDatagram, UnixStream};
49use std::path::{Path, PathBuf};
50use std::sync::{Arc, Once};
51use std::thread;
52use std::time::{Duration, Instant};
53pub use syslog::Facility;
54use syslog::{Formatter3164, LogFormat as SyslogFormat, LoggerBackend as SyslogBackend, Severity};
55
56const TIMEOUT_DEFAULT: Duration = Duration::from_secs(5);
57const UNIX_SOCK_PATHS: [&str; 3] = ["/dev/log", "/var/run/syslog", "/var/run/log"];
58const LOCAL_TCP: &'static str = "127.0.0.1:601";
61
62#[allow(dead_code)]
63const LOCAL_UDP: &'static str = "127.0.0.1:514";
64
65#[derive(Hash)]
66pub enum SyslogProto {
67 RFC3164,
68}
69#[derive(Hash, Clone, Debug)]
73pub enum SyslogAddr {
74 TCP(String),
76 UDP(String, String),
78 Unix(PathBuf),
80}
81
82pub struct Syslog {
84 pub facility: Facility,
86 pub process: Option<String>,
88 pub hostname: Option<String>,
90 pub level: Level,
92 pub proto: SyslogProto,
94 pub server: Option<SyslogAddr>,
96 pub timeout: Duration,
98}
99
100impl Hash for Syslog {
101 fn hash<H: Hasher>(&self, hasher: &mut H) {
102 hasher.write_u32(self.facility as u32);
103 self.process.hash(hasher);
104 self.hostname.hash(hasher);
105 self.level.hash(hasher);
106 self.proto.hash(hasher);
107 self.timeout.hash(hasher);
108 self.server.hash(hasher);
109 }
110}
111
112impl Default for Syslog {
113 fn default() -> Self {
114 Self {
115 proto: SyslogProto::RFC3164,
116 facility: Facility::LOG_USER,
117 process: None,
118 hostname: None,
119 level: Level::Trace,
120 timeout: TIMEOUT_DEFAULT,
121 server: None,
122 }
123 }
124}
125
126impl Syslog {
127 pub fn new(facility: Facility, level: Level) -> Self {
128 let mut s = Self::default();
129 s.proto = SyslogProto::RFC3164;
130 s.facility = facility;
131 s.level = level;
132 s
133 }
134
135 pub fn timeout(mut self, d: Duration) -> Self {
136 self.timeout = d;
137 self
138 }
139
140 pub fn hostname(mut self, name: String) -> Self {
142 self.hostname = Some(name);
143 self
144 }
145
146 pub fn process_name(mut self, name: String) -> Self {
148 self.process = Some(name);
149 self
150 }
151
152 pub fn unix<P: Into<PathBuf>>(mut self, p: P) -> Self {
153 self.server = Some(SyslogAddr::Unix(p.into()));
154 self
155 }
156
157 pub fn tcp<S: AsRef<str>>(mut self, remote: S) -> Self {
158 self.server = Some(SyslogAddr::TCP(remote.as_ref().to_string()));
159 self
160 }
161
162 pub fn udp<S: AsRef<str>>(mut self, local: S, remote: S) -> Self {
163 self.server =
164 Some(SyslogAddr::UDP(local.as_ref().to_string(), remote.as_ref().to_string()));
165 self
166 }
167}
168
169impl SinkConfigBuild for Syslog {
170 fn build(&self) -> LogSink {
171 LogSink::Syslog(LogSinkSyslog::new(self))
172 }
173}
174
175impl SinkConfigTrait for Syslog {
176 fn get_level(&self) -> Level {
177 self.level
178 }
179
180 fn get_file_path(&self) -> Option<Box<Path>> {
181 None
182 }
183
184 fn write_hash(&self, hasher: &mut Box<dyn Hasher>) {
185 self.hash(hasher);
186 hasher.write(b"Syslog");
187 }
188}
189
190enum Msg {
191 Line(Vec<u8>),
192 Flush(Arc<Once>),
193}
194
195pub(crate) struct LogSinkSyslog {
196 tx: MTx<mpsc::Array<Msg>>,
197 format: Formatter3164,
198 max_level: Level,
199}
200
201impl LogSinkSyslog {
202 fn new(config: &Syslog) -> Self {
203 let (tx, rx) = mpsc::bounded_blocking(256);
204
205 macro_rules! fill_format {
206 ($f: expr, $config: expr) => {{
207 $f.facility = $config.facility;
208 if $config.server.is_some() {
209 if let Some(hostname) = &$config.hostname {
211 $f.hostname = Some(hostname.clone());
212 }
213 } else {
214 $f.hostname = None;
216 }
217 if let Some(process) = &$config.process {
218 $f.process = process.clone();
219 }
220 }};
221 }
222 let mut timeout = config.timeout;
223 if timeout == Duration::from_secs(0) {
224 timeout = TIMEOUT_DEFAULT;
225 }
226 let mut backend = Backend { server: config.server.clone(), timeout, writer: None };
227 let _ = backend.reinit();
228
229 let mut f = Formatter3164::default();
230 fill_format!(f, config);
231 thread::spawn(move || backend.run(rx));
232 Self { tx, max_level: config.level, format: f }
233 }
234}
235
236impl LogSinkTrait for LogSinkSyslog {
237 fn open(&self) -> std::io::Result<()> {
238 Ok(())
239 }
240
241 fn reopen(&self) -> std::io::Result<()> {
242 Ok(())
243 }
244
245 #[inline(always)]
246 fn log(&self, _now: &Timer, r: &Record) {
247 let l = r.level();
248 if r.level() <= self.max_level {
249 let mut buf = Vec::with_capacity(128);
250 let _level = match l {
251 Level::Trace => Severity::LOG_DEBUG, Level::Debug => Severity::LOG_DEBUG,
253 Level::Info => Severity::LOG_INFO,
254 Level::Warn => Severity::LOG_WARNING,
255 Level::Error => Severity::LOG_ERR,
256 };
257 let msg = format!("{}", r.args());
258 self.format.format(&mut buf, _level, msg).expect("format");
259 let _ = self.tx.send(Msg::Line(buf));
260 }
261 }
262
263 #[inline(always)]
264 fn flush(&self) {
265 let o = Arc::new(Once::new());
266 if self.tx.send(Msg::Flush(o.clone())).is_ok() {
267 o.wait();
268 }
269 }
270}
271
272struct Backend {
273 server: Option<SyslogAddr>,
274 writer: Option<SyslogBackend>,
275 timeout: Duration,
276}
277
278impl Backend {
279 #[inline]
280 fn connect_unix(path: &Path) -> Result<SyslogBackend> {
281 let sock = UnixDatagram::unbound()?;
282 match sock.connect(Path::new(path)) {
283 Ok(()) => {
284 println!("syslog: connect to unix {:?}", path);
285 return Ok(SyslogBackend::Unix(sock));
286 }
287 Err(e) => {
288 if e.raw_os_error() == Some(libc::EPROTOTYPE) {
289 let sock = UnixStream::connect(path)?;
290 println!("syslog: connect to unix {:?}", path);
291 return Ok(SyslogBackend::UnixStream(BufWriter::new(sock)));
292 }
293 return Err(e);
294 }
295 }
296 }
297
298 #[inline]
299 fn connect_tcp(s: &str, timeout: Duration) -> Result<SyslogBackend> {
300 for addr in s.to_socket_addrs()? {
301 let socket = TcpStream::connect_timeout(&addr, timeout)?;
302 return Ok(SyslogBackend::Tcp(BufWriter::new(socket)));
303 }
304 Err(Error::new(ErrorKind::NotFound, "syslog: no server address").into())
305 }
306
307 #[inline]
308 fn connect_udp(local: &str, remote: &str) -> Result<SyslogBackend> {
309 let server_addr = remote.to_socket_addrs().and_then(|mut server_addr_opt| {
310 server_addr_opt
311 .next()
312 .ok_or_else(|| Error::new(ErrorKind::NotFound, "syslog: no server address").into())
313 })?;
314 println!("syslog: connect to udp {:?}", remote);
315 let socket = UdpSocket::bind(local)?;
316 return Ok(SyslogBackend::Udp(socket, server_addr));
317 }
318
319 fn connect(server: &Option<SyslogAddr>, timeout: Duration) -> Result<SyslogBackend> {
320 match server {
321 Some(SyslogAddr::Unix(p)) => Self::connect_unix(p.as_path()),
322 Some(SyslogAddr::UDP(local, remote)) => Self::connect_udp(&local, &remote),
323 Some(SyslogAddr::TCP(remote)) => Self::connect_tcp(&remote, timeout),
324 None => {
325 for p in &UNIX_SOCK_PATHS {
326 if let Ok(backend) = Self::connect_unix(Path::new(p)) {
327 return Ok(backend);
328 }
329 }
330 return Self::connect_tcp(LOCAL_TCP, timeout);
331 }
335 }
336 }
337
338 #[inline(always)]
339 fn reinit(&mut self) -> Result<()> {
340 match Self::connect(&self.server, self.timeout) {
341 Err(e) => {
342 eprintln!("syslog: connect {:?} err {:?}", e, self.server);
343 return Err(e);
344 }
345 Ok(backend) => {
346 self.writer = Some(backend);
347 Ok(())
348 }
349 }
350 }
351
352 #[inline(always)]
353 fn flush(&mut self) {
354 if let Some(writer) = self.writer.as_mut() {
355 if let Err(e) = writer.flush() {
356 eprintln!("syslog: flush err {:?}", e);
357 self.writer = None;
358 }
359 }
360 }
361
362 #[inline]
363 fn write(&mut self, msg: &[u8]) {
364 if let Some(writer) = self.writer.as_mut() {
365 match writer.write_all(msg) {
366 Ok(_) => return,
367 Err(e) => {
368 eprintln!("syslog: write err {:?}", e);
369 self.writer = None;
370 }
371 }
372 }
373 let start_ts = Instant::now();
374 loop {
375 thread::sleep(Duration::from_millis(500));
376 if self.reinit().is_ok() {
377 if let Some(writer) = self.writer.as_mut() {
378 match writer.write_all(msg) {
379 Ok(_) => return,
380 Err(e) => {
381 eprintln!("syslog: write err {:?}", e);
382 self.writer = None;
383 }
384 }
385 }
386 }
387 if Instant::now().duration_since(start_ts) > self.timeout {
388 return;
390 }
391 }
392 }
393
394 fn run(&mut self, rx: Rx<mpsc::Array<Msg>>) {
395 loop {
396 match rx.recv() {
397 Ok(Msg::Line(_msg)) => {
398 self.write(&_msg);
399 let mut need_flush = true;
400 while let Ok(msg) = rx.try_recv() {
401 match msg {
402 Msg::Line(_msg) => self.write(&_msg),
403 Msg::Flush(o) => {
404 self.flush();
405 o.call_once(|| {});
406 need_flush = false;
407 }
408 }
409 }
410 if need_flush {
411 self.flush();
412 }
413 }
414 Ok(Msg::Flush(o)) => {
415 self.flush();
416 o.call_once(|| {});
417 }
418 Err(_) => {
419 self.flush();
420 return;
422 }
423 }
424 }
425 }
426}