1use crate::{
2 config::{SinkConfigBuild, SinkConfigTrait},
3 log_impl::{LogSink, LogSinkTrait},
4 time::Timer,
5};
6use crossfire::*;
7use log::{Level, Record};
8use std::hash::{Hash, Hasher};
9use std::io::{BufWriter, Error, ErrorKind, Result, Write};
10use std::net::{TcpStream, ToSocketAddrs, UdpSocket};
11use std::os::unix::net::{UnixDatagram, UnixStream};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Once};
14use std::thread;
15use std::time::{Duration, Instant};
16pub use syslog::Facility;
17use syslog::{Formatter3164, LogFormat as SyslogFormat, LoggerBackend as SyslogBackend, Severity};
18
19const TIMEOUT_DEFAULT: Duration = Duration::from_secs(5);
20const UNIX_SOCK_PATHS: [&str; 3] = ["/dev/log", "/var/run/syslog", "/var/run/log"];
21const LOCAL_TCP: &'static str = "127.0.0.1:601";
24
25#[allow(dead_code)]
26const LOCAL_UDP: &'static str = "127.0.0.1:514";
27
28#[derive(Hash)]
29pub enum SyslogProto {
30 RFC3164,
31}
32#[derive(Hash, Clone, Debug)]
36pub enum SyslogAddr {
37 TCP(String),
39 UDP(String, String),
41 Unix(PathBuf),
43}
44
45pub struct Syslog {
85 pub facility: Facility,
87 pub process: Option<String>,
89 pub hostname: Option<String>,
91 pub level: Level,
93 pub proto: SyslogProto,
95 pub server: Option<SyslogAddr>,
97 pub timeout: Duration,
99}
100
101impl Hash for Syslog {
102 fn hash<H: Hasher>(&self, hasher: &mut H) {
103 hasher.write_u32(self.facility as u32);
104 self.process.hash(hasher);
105 self.hostname.hash(hasher);
106 self.level.hash(hasher);
107 self.proto.hash(hasher);
108 self.timeout.hash(hasher);
109 self.server.hash(hasher);
110 }
111}
112
113impl Default for Syslog {
114 fn default() -> Self {
115 Self {
116 proto: SyslogProto::RFC3164,
117 facility: Facility::LOG_USER,
118 process: None,
119 hostname: None,
120 level: Level::Trace,
121 timeout: TIMEOUT_DEFAULT,
122 server: None,
123 }
124 }
125}
126
127impl Syslog {
128 pub fn new(facility: Facility, level: Level) -> Self {
129 let mut s = Self::default();
130 s.proto = SyslogProto::RFC3164;
131 s.facility = facility;
132 s.level = level;
133 s
134 }
135
136 pub fn timeout(mut self, d: Duration) -> Self {
137 self.timeout = d;
138 self
139 }
140
141 pub fn hostname(mut self, name: String) -> Self {
143 self.hostname = Some(name);
144 self
145 }
146
147 pub fn process_name(mut self, name: String) -> Self {
149 self.process = Some(name);
150 self
151 }
152
153 pub fn unix<P: Into<PathBuf>>(mut self, p: P) -> Self {
154 self.server = Some(SyslogAddr::Unix(p.into()));
155 self
156 }
157
158 pub fn tcp<S: AsRef<str>>(mut self, remote: S) -> Self {
159 self.server = Some(SyslogAddr::TCP(remote.as_ref().to_string()));
160 self
161 }
162
163 pub fn udp<S: AsRef<str>>(mut self, local: S, remote: S) -> Self {
164 self.server =
165 Some(SyslogAddr::UDP(local.as_ref().to_string(), remote.as_ref().to_string()));
166 self
167 }
168}
169
170impl SinkConfigBuild for Syslog {
171 fn build(&self) -> LogSink {
172 LogSink::Syslog(LogSinkSyslog::new(self))
173 }
174}
175
176impl SinkConfigTrait for Syslog {
177 fn get_level(&self) -> Level {
178 self.level
179 }
180
181 fn get_file_path(&self) -> Option<Box<Path>> {
182 None
183 }
184
185 fn write_hash(&self, hasher: &mut Box<dyn Hasher>) {
186 self.hash(hasher);
187 hasher.write(b"Syslog");
188 }
189}
190
191enum Msg {
192 Line(Vec<u8>),
193 Flush(Arc<Once>),
194}
195
196pub(crate) struct LogSinkSyslog {
197 tx: MTx<Msg>,
198 format: Formatter3164,
199 max_level: Level,
200}
201
202impl LogSinkSyslog {
203 fn new(config: &Syslog) -> Self {
204 let (tx, rx) = mpsc::bounded_blocking(100);
205
206 macro_rules! fill_format {
207 ($f: expr, $config: expr) => {{
208 $f.facility = $config.facility;
209 if $config.server.is_some() {
210 if let Some(hostname) = &$config.hostname {
212 $f.hostname = Some(hostname.clone());
213 }
214 } else {
215 $f.hostname = None;
217 }
218 if let Some(process) = &$config.process {
219 $f.process = process.clone();
220 }
221 }};
222 }
223 let mut timeout = config.timeout;
224 if timeout == Duration::from_secs(0) {
225 timeout = TIMEOUT_DEFAULT;
226 }
227 let mut backend = Backend { server: config.server.clone(), timeout, writer: None };
228 let _ = backend.reinit();
229
230 let mut f = Formatter3164::default();
231 fill_format!(f, config);
232 thread::spawn(move || backend.run(rx));
233 Self { tx, max_level: config.level, format: f }
234 }
235}
236
237impl LogSinkTrait for LogSinkSyslog {
238 fn open(&self) -> std::io::Result<()> {
239 Ok(())
240 }
241
242 fn reopen(&self) -> std::io::Result<()> {
243 Ok(())
244 }
245
246 #[inline(always)]
247 fn log(&self, _now: &Timer, r: &Record) {
248 let l = r.level();
249 if r.level() <= self.max_level {
250 let mut buf = Vec::with_capacity(128);
251 let _level = match l {
252 Level::Trace => Severity::LOG_DEBUG, Level::Debug => Severity::LOG_DEBUG,
254 Level::Info => Severity::LOG_INFO,
255 Level::Warn => Severity::LOG_WARNING,
256 Level::Error => Severity::LOG_ERR,
257 };
258 let msg = format!("{}", r.args());
259 self.format.format(&mut buf, _level, msg).expect("format");
260 let _ = self.tx.send(Msg::Line(buf));
261 }
262 }
263
264 #[inline(always)]
265 fn flush(&self) {
266 let o = Arc::new(Once::new());
267 if self.tx.send(Msg::Flush(o.clone())).is_ok() {
268 o.wait();
269 }
270 }
271}
272
273struct Backend {
274 server: Option<SyslogAddr>,
275 writer: Option<SyslogBackend>,
276 timeout: Duration,
277}
278
279impl Backend {
280 #[inline]
281 fn connect_unix(path: &Path) -> Result<SyslogBackend> {
282 let sock = UnixDatagram::unbound()?;
283 match sock.connect(Path::new(path)) {
284 Ok(()) => {
285 println!("syslog: connect to unix {:?}", path);
286 return Ok(SyslogBackend::Unix(sock));
287 }
288 Err(e) => {
289 if e.raw_os_error() == Some(libc::EPROTOTYPE) {
290 let sock = UnixStream::connect(path)?;
291 println!("syslog: connect to unix {:?}", path);
292 return Ok(SyslogBackend::UnixStream(BufWriter::new(sock)));
293 }
294 return Err(e);
295 }
296 }
297 }
298
299 #[inline]
300 fn connect_tcp(s: &str, timeout: Duration) -> Result<SyslogBackend> {
301 for addr in s.to_socket_addrs()? {
302 let socket = TcpStream::connect_timeout(&addr, timeout)?;
303 return Ok(SyslogBackend::Tcp(BufWriter::new(socket)));
304 }
305 Err(Error::new(ErrorKind::NotFound, "syslog: no server address").into())
306 }
307
308 #[inline]
309 fn connect_udp(local: &str, remote: &str) -> Result<SyslogBackend> {
310 let server_addr = remote.to_socket_addrs().and_then(|mut server_addr_opt| {
311 server_addr_opt
312 .next()
313 .ok_or_else(|| Error::new(ErrorKind::NotFound, "syslog: no server address").into())
314 })?;
315 println!("syslog: connect to udp {:?}", remote);
316 let socket = UdpSocket::bind(local)?;
317 return Ok(SyslogBackend::Udp(socket, server_addr));
318 }
319
320 fn connect(server: &Option<SyslogAddr>, timeout: Duration) -> Result<SyslogBackend> {
321 match server {
322 Some(SyslogAddr::Unix(p)) => Self::connect_unix(p.as_path()),
323 Some(SyslogAddr::UDP(local, remote)) => Self::connect_udp(&local, &remote),
324 Some(SyslogAddr::TCP(remote)) => Self::connect_tcp(&remote, timeout),
325 None => {
326 for p in &UNIX_SOCK_PATHS {
327 if let Ok(backend) = Self::connect_unix(Path::new(p)) {
328 return Ok(backend);
329 }
330 }
331 return Self::connect_tcp(LOCAL_TCP, timeout);
332 }
336 }
337 }
338
339 #[inline(always)]
340 fn reinit(&mut self) -> Result<()> {
341 match Self::connect(&self.server, self.timeout) {
342 Err(e) => {
343 eprintln!("syslog: connect {:?} err {:?}", e, self.server);
344 return Err(e);
345 }
346 Ok(backend) => {
347 self.writer = Some(backend);
348 Ok(())
349 }
350 }
351 }
352
353 #[inline(always)]
354 fn flush(&mut self) {
355 if let Some(writer) = self.writer.as_mut() {
356 if let Err(e) = writer.flush() {
357 eprintln!("syslog: flush err {:?}", e);
358 self.writer = None;
359 }
360 }
361 }
362
363 #[inline]
364 fn write(&mut self, msg: &[u8]) {
365 if let Some(writer) = self.writer.as_mut() {
366 match writer.write_all(msg) {
367 Ok(_) => return,
368 Err(e) => {
369 eprintln!("syslog: write err {:?}", e);
370 self.writer = None;
371 }
372 }
373 }
374 let start_ts = Instant::now();
375 loop {
376 thread::sleep(Duration::from_millis(500));
377 if self.reinit().is_ok() {
378 if let Some(writer) = self.writer.as_mut() {
379 match writer.write_all(msg) {
380 Ok(_) => return,
381 Err(e) => {
382 eprintln!("syslog: write err {:?}", e);
383 self.writer = None;
384 }
385 }
386 }
387 }
388 if Instant::now().duration_since(start_ts) > self.timeout {
389 return;
391 }
392 }
393 }
394
395 fn run(&mut self, rx: Rx<Msg>) {
396 loop {
397 match rx.recv() {
398 Ok(Msg::Line(_msg)) => {
399 self.write(&_msg);
400 let mut need_flush = true;
401 while let Ok(msg) = rx.try_recv() {
402 match msg {
403 Msg::Line(_msg) => self.write(&_msg),
404 Msg::Flush(o) => {
405 self.flush();
406 o.call_once(|| {});
407 need_flush = false;
408 }
409 }
410 }
411 if need_flush {
412 self.flush();
413 }
414 }
415 Ok(Msg::Flush(o)) => {
416 self.flush();
417 o.call_once(|| {});
418 }
419 Err(_) => {
420 self.flush();
421 return;
423 }
424 }
425 }
426 }
427}