syslog_rs/a_sync/async_tokio/
async_socket.rs1#[cfg(feature = "build_ext_tls")]
20mod with_tls
21{
22 use std::io::ErrorKind;
23
24 use tokio::{io::AsyncWriteExt, net::{TcpSocket, TcpStream}, time};
25 use tokio_rustls::{TlsConnector};
26 use tokio_rustls::client::TlsStream;
27
28 use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error, map_error_os, SyslogDestMsg, SyslogTls, TapType};
29
30 impl AsyncSyslogTap<SyslogTls> for AsyncTap<TlsStream<TcpStream>, SyslogTls>
31 {
32 fn new(req_tap: SyslogTls) -> SyRes<Self>
33 {
34 let ret =
35 Self
36 {
37 sock: None,
38 tap_data: req_tap,
39 cur_tap_type: TapType::NetTcp,
40 };
41
42 return Ok(ret);
43 }
44
45 async
46 fn connectlog(&mut self) -> SyRes<()>
47 {
48
49 let tcp_socket =
50 if self.tap_data.get_remote_addr().is_ipv4() == true
51 {
52 TcpSocket::new_v4()
53 }
54 else
55 {
56 TcpSocket::new_v6()
57 }
58 .map_err(|e|
59 map_error!("cannot create tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
60 )?;
61
62 if self.tap_data.get_bind_addr().ip().is_unspecified() == false
64 {
65 tcp_socket
66 .bind(*self.tap_data.get_bind_addr())
67 .map_err(|e|
68 map_error!("cannot bind tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
69 )?;
70 }
71
72 let socket =
73 if let Some(c_timeout) = self.tap_data.get_get_conn_timeout()
74 {
75 time::timeout(c_timeout, tcp_socket.connect(*self.tap_data.get_remote_addr()))
76 .await
77 .map_err(|e|
78 map_error!("can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
79 )?
80 }
81 else
82 {
83 tcp_socket
84 .connect(*self.tap_data.get_remote_addr())
85 .await
86 }
87 .map_err(|e|
88 map_error_os!(e, "cannot connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
89 )?;
90
91 let connector = TlsConnector::from(self.tap_data.get_client_config());
92
93 let stream =
94 connector
95 .connect(self.tap_data.get_serv_name(), socket)
96 .await
97 .map_err(|e|
98 map_error!("Remote server: '{}', TLS connector error: '{}'", self.tap_data.get_remote_addr(), e)
99 )?;
100
101 self.sock = Some(stream);
103
104 return Ok(());
105 }
106
107 #[inline]
108 async
109 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
110 {
111 let sock =
112 self.sock
113 .as_mut()
114 .ok_or_else(||
115 std::io::Error::new(ErrorKind::NotConnected, "no connection")
116 )?;
117
118 sock.write_all(msg).await?;
119 sock.flush().await?;
120
121 return Ok(msg.len());
122 }
123
124 async
125 fn disconnectlog(&mut self) -> std::io::Result<()>
126 {
127 match self.sock.take()
128 {
129 Some(mut s) =>
130 {
131 s.shutdown().await?;
132
133 drop(s);
134
135 Ok(())
136 },
137 None =>
138 {
139 Ok(())
140 }
141 }
142 }
143
144 #[inline]
145 fn is_connected(&self) -> bool
146 {
147 return self.sock.is_some();
148 }
149
150 #[inline]
151 fn get_type(&self) -> TapType
152 {
153 return self.cur_tap_type;
154 }
155
156 #[inline]
157 fn get_max_msg_size() -> usize
158 {
159 return crate::SyslogTls::get_max_msg_len();
160 }
161
162 #[inline]
163 fn update_tap_data(&mut self, tap_data: SyslogTls)
164 {
165 self.tap_data = tap_data;
166 }
167 }
168}
169
170
171#[cfg(feature = "build_ext_net")]
172mod with_net
173{
174 use std::io::ErrorKind;
175
176 use tokio::{io::AsyncWriteExt, net::{TcpSocket, TcpStream, UdpSocket}, time};
177
178 use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error, map_error_os, SyslogDestMsg, SyslogNetTcp, SyslogNetUdp, TapType};
179
180 impl AsyncSyslogTap<SyslogNetTcp> for AsyncTap<TcpStream, SyslogNetTcp>
181 {
182 fn new(req_tap: SyslogNetTcp) -> SyRes<Self>
183 {
184 let ret =
185 Self
186 {
187 sock: None,
188 tap_data: req_tap,
189 cur_tap_type: TapType::NetTcp,
190 };
191
192 return Ok(ret);
193 }
194
195 async
196 fn connectlog(&mut self) -> SyRes<()>
197 {
198 let tcp_socket =
199 if self.tap_data.get_remote_addr().is_ipv4() == true
200 {
201 TcpSocket::new_v4()
202 }
203 else
204 {
205 TcpSocket::new_v6()
206 }
207 .map_err(|e|
208 map_error!("cannot create tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
209 )?;
210
211 if self.tap_data.get_bind_addr().ip().is_unspecified() == false
213 {
214 tcp_socket
215 .bind(*self.tap_data.get_bind_addr())
216 .map_err(|e|
217 map_error!("cannot bind tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
218 )?;
219 }
220
221 let socket =
222 if let Some(c_timeout) = self.tap_data.get_conn_timeout()
223 {
224 time::timeout(c_timeout, tcp_socket.connect(*self.tap_data.get_remote_addr()))
225 .await
226 .map_err(|e|
227 map_error!("can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
228 )?
229 }
230 else
231 {
232 tcp_socket
233 .connect(*self.tap_data.get_remote_addr())
234 .await
235 }
236 .map_err(|e|
237 map_error_os!(e, "can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
238 )?;
239
240
241 self.sock = Some(socket);
243
244 return Ok(());
245 }
246
247 #[inline]
248 async
249 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
250 {
251 let sock =
252 self.sock
253 .as_mut()
254 .ok_or_else(||
255 std::io::Error::new(ErrorKind::NotConnected, "no connection")
256 )?;
257
258 sock.write_all(msg).await?;
259 sock.flush().await?;
260
261 return Ok(msg.len());
262 }
263
264 async
265 fn disconnectlog(&mut self) -> std::io::Result<()>
266 {
267 match self.sock.take()
268 {
269 Some(mut s) =>
270 {
271 s.shutdown().await?;
272
273 drop(s);
274
275 Ok(())
276 },
277 None =>
278 {
279 Ok(())
280 }
281 }
282 }
283
284 #[inline]
285 fn is_connected(&self) -> bool
286 {
287 return self.sock.is_some();
288 }
289
290 #[inline]
291 fn get_type(&self) -> TapType
292 {
293 return self.cur_tap_type;
294 }
295
296 #[inline]
297 fn get_max_msg_size() -> usize
298 {
299 return crate::SyslogNetTcp::get_max_msg_len();
300 }
301
302 #[inline]
303 fn update_tap_data(&mut self, tap_data: SyslogNetTcp)
304 {
305 self.tap_data = tap_data;
306 }
307 }
308
309 impl AsyncSyslogTap<SyslogNetUdp> for AsyncTap<UdpSocket, SyslogNetUdp>
310 {
311 fn new(req_tap: SyslogNetUdp) -> SyRes<Self>
312 {
313 let ret =
314 Self
315 {
316 sock: None,
317 tap_data: req_tap,
318 cur_tap_type: TapType::NetUdp,
319 };
320
321 return Ok(ret);
322 }
323
324 async
325 fn connectlog(&mut self) -> SyRes<()>
326 {
327 let socket =
328 UdpSocket
329 ::bind(self.tap_data.get_bind_addr())
330 .await
331 .map_err(|e|
332 map_error_os!(e, "can not bind udp '{}', error: {}", self.tap_data.get_bind_addr(), e)
333 )?;
334
335 socket
336 .connect(self.tap_data.get_remote_addr())
337 .await
338 .map_err(|e|
339 map_error_os!(e, "can not connect to udp '{}', error: {}", self.tap_data.get_remote_addr(), e)
340 )?;
341
342 self.sock = Some(socket);
344
345 return Ok(());
346 }
347
348 #[inline]
349 async
350 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
351 {
352 let sock =
353 self
354 .sock
355 .as_mut()
356 .ok_or_else(||
357 std::io::Error::new(ErrorKind::NotConnected, "no connection")
358 )?;
359
360 return sock.send(msg).await;
361 }
362
363 async
364 fn disconnectlog(&mut self) -> std::io::Result<()>
365 {
366 match self.sock.take()
367 {
368 Some(s) =>
369 {
370 drop(s);
371
372 Ok(())
373 },
374 None =>
375 {
376 Ok(())
377 }
378 }
379 }
380
381 #[inline]
382 fn is_connected(&self) -> bool
383 {
384 return self.sock.is_some();
385 }
386
387 #[inline]
388 fn get_type(&self) -> TapType
389 {
390 return self.cur_tap_type;
391 }
392
393 #[inline]
394 fn get_max_msg_size() -> usize
395 {
396 return crate::SyslogNetUdp::get_max_msg_len();
397 }
398
399 #[inline]
400 fn update_tap_data(&mut self, tap_data: SyslogNetUdp)
401 {
402 self.tap_data = tap_data;
403 }
404 }
405}
406
407
408#[cfg(target_family = "unix")]
409mod with_unix_syslog
410{
411 use std::io::ErrorKind;
412 use std::net::Shutdown;
413
414 use tokio::net::{UnixDatagram};
415 use nix::errno::Errno;
416
417 use crate::a_sync::{AsyncSyslogTap, AsyncTap};
418 use crate::{SyslogDestMsg, SyslogLocal, TapType, map_error_os, throw_error_errno, throw_error_os};
419 use crate::common::*;
420 use crate::error::SyRes;
421
422
423 impl AsyncSyslogTap<SyslogLocal> for AsyncTap<UnixDatagram, SyslogLocal>
424 {
425
426 fn new(req_tap: SyslogLocal) -> SyRes<Self>
427 {
428 return Ok(
429 Self
430 {
431 sock: None,
432 tap_data: req_tap,
433 cur_tap_type: TapType::None,
434 }
435 );
436 }
437
438
439 async
440 fn connectlog(&mut self) -> SyRes<()>
441 {
442 let sock =
443 UnixDatagram
444 ::unbound()
445 .map_err(|e|
446 map_error_os!(e, "unbounded unix datagram initialization failure: {}", e)
447 )?;
448
449 let tap_type =
450 if self.tap_data.get_use_alternative() == false && self.tap_data.get_custom_remote_path().is_some() == true
451 {
452 if let Err(e) = sock.connect(self.tap_data.get_custom_remote_path().as_ref().unwrap())
453 {
454 throw_error_os!(e, "failed to open connection to syslog server at '{}'",
455 self.tap_data.get_custom_remote_path().as_ref().unwrap().display());
456 }
457 else
458 {
459 TapType::CustomLog
460 }
461 }
462 else if self.tap_data.get_custom_remote_path().is_some() == true &&
463 sock.connect(self.tap_data.get_custom_remote_path().as_ref().unwrap()).is_ok() == true
464 {
465 TapType::CustomLog
466 }
467 else if let Ok(_) = sock.connect(PATH_LOG_PRIV)
468 {
469 TapType::Priv
470 }
471 else if let Ok(_) = sock.connect(PATH_LOG)
472 {
473 TapType::UnPriv
474 }
475 else if let Ok(_) = sock.connect(PATH_OLDLOG)
476 {
477 TapType::OldLog
478 }
479 else if let Ok(_) = sock.connect(PATH_OSX)
480 {
481 TapType::Priv
482 }
483 else
484 {
485 throw_error_errno!(Errno::last(), "failed to open connection to syslog server");
487 };
488
489 #[cfg(any(
490 target_os = "freebsd",
491 target_os = "dragonfly",
492 target_os = "openbsd",
493 target_os = "netbsd",
494 target_os = "macos"
495 ))]
496 {
497 use std::os::fd::AsRawFd;
498
499 let mut len: std::mem::MaybeUninit<nix::libc::socklen_t> = std::mem::MaybeUninit::uninit();
501
502 let res =
504 unsafe
505 {
506 nix::libc::getsockopt(
507 sock.as_raw_fd(),
508 nix::libc::SOL_SOCKET,
509 nix::libc::SO_SNDBUF,
510 len.as_mut_ptr() as *mut nix::libc::c_void,
511 &mut {
512 std::mem::size_of::<nix::libc::socklen_t>() as nix::libc::socklen_t
513 }
514 )
515 };
516
517 if res == 0
518 {
519 let mut len = unsafe { len.assume_init() } as usize;
520
521 if len < MAXLINE
522 {
523 len = MAXLINE;
524
525 unsafe {
526 nix::libc::setsockopt(
527 sock.as_raw_fd(),
528 nix::libc::SOL_SOCKET,
529 nix::libc::SO_SNDBUF,
530 &len as *const _ as *const nix::libc::c_void,
531 std::mem::size_of::<nix::libc::socklen_t>() as nix::libc::socklen_t
532 )
533 };
534 }
535 }
536 }
537
538 sock
539 .shutdown(std::net::Shutdown::Read)
540 .map_err(|e|
541 map_error_os!(e, "can not shutdown read portion, error: '{}'", e)
542 )?;
543
544 self.sock = Some(sock);
545 self.cur_tap_type = tap_type;
546
547 return Ok(());
548 }
549
550 #[inline]
551 async
552 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
553 {
554 let sock =
555 self.sock
556 .as_mut()
557 .ok_or_else(||
558 std::io::Error::new(ErrorKind::NotConnected, "no connection")
559 )?;
560
561 return sock.send(msg).await;
562 }
563
564 async
565 fn disconnectlog(&mut self) -> std::io::Result<()>
566 {
567 match self.sock.take()
568 {
569 Some(s) =>
570 {
571 self.cur_tap_type = TapType::None;
572
573 s.shutdown(Shutdown::Both)
574 },
575 None =>
576 {
577 self.cur_tap_type = TapType::None;
578
579 Ok(())
580 }
581 }
582 }
583
584 #[inline]
585 fn is_connected(&self) -> bool
586 {
587 return self.sock.is_some();
588 }
589
590 #[inline]
591 fn get_type(&self) -> TapType
592 {
593 return self.cur_tap_type;
594 }
595
596 #[inline]
597 fn get_max_msg_size() -> usize
598 {
599 return crate::SyslogLocal::get_max_msg_len();
600 }
601
602 #[inline]
603 fn update_tap_data(&mut self, tap_data: SyslogLocal)
604 {
605 self.tap_data = tap_data;
606 }
607 }
608}
609
610
611
612#[cfg(feature = "build_ext_file")]
613mod with_file
614{
615 use std::io::ErrorKind;
616
617 use tokio::fs::File;
618
619 use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error_os, SyslogDestMsg, SyslogFile, TapType};
620
621 impl AsyncSyslogTap<SyslogFile> for AsyncTap<File, SyslogFile>
622 {
623 fn new(req_tap: SyslogFile) -> SyRes<Self>
624 {
625 let ret =
626 Self
627 {
628 sock: None,
629 tap_data: req_tap,
630 cur_tap_type: TapType::LocalFile,
631 };
632
633 return Ok(ret);
634 }
635
636 async
637 fn connectlog(&mut self) -> SyRes<()>
638 {
639
640 let file =
641 File::options()
642 .append(true)
643 .read(true)
644 .write(true)
645 .create(true)
646 .open(self.tap_data.get_path())
647 .await
648 .map_err(|e|
649 map_error_os!(e, "can not open file '{}' to write, error: '{}'",
650 self.tap_data.get_path().display(), e)
651 )?;
652
653 self.sock = Some(file);
654
655 return Ok(());
656 }
657
658 #[inline]
659 async
660 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
661 {
662 use crate::tokio::io::AsyncWriteExt;
663
664 let sock =
665 self
666 .sock
667 .as_mut()
668 .ok_or_else(||
669 std::io::Error::new(ErrorKind::NotConnected, "no connection")
670 )?;
671
672 sock.write_all(msg).await?;
673
674 return Ok(msg.len());
675 }
676
677 async
678 fn disconnectlog(&mut self) -> std::io::Result<()>
679 {
680 match self.sock.take()
681 {
682 Some(s) =>
683 {
684 self.cur_tap_type = TapType::None;
685
686 s.sync_data().await?;
687
688 drop(s);
689
690 return Ok(());
691 },
692 None =>
693 {
694 self.cur_tap_type = TapType::None;
695
696 return Ok(())
697 }
698 }
699 }
700
701 #[inline]
702 fn is_connected(&self) -> bool
703 {
704 return self.sock.is_some();
705 }
706
707 #[inline]
708 fn get_type(&self) -> TapType
709 {
710 return self.cur_tap_type;
711 }
712
713 #[inline]
714 fn get_max_msg_size() -> usize
715 {
716 return crate::SyslogFile::get_max_msg_len();
717 }
718
719 #[inline]
720 fn update_tap_data(&mut self, tap_data: SyslogFile)
721 {
722 self.tap_data = tap_data;
723 }
724 }
725}
726
727
728#[cfg(target_family = "windows")]
731mod with_windows_eventlog
732{
733 use std::{fmt};
734 use std::io::ErrorKind;
735
736 use crate::portable::{EventLogLocal};
737 use crate::{error::SyRes, TapType};
738
739
740 use crate::
741 {
742 AsyncSyslogTap, AsyncTap, SyslogDestMsg, WindowsEvent, map_error_os
743 };
744
745 use crate::common::*;
746
747 impl AsyncSyslogTap<WindowsEvent> for AsyncTap<EventLogLocal, WindowsEvent>
748 {
749 fn new(req_tap: WindowsEvent) -> SyRes<Self>
761 {
762 return Ok(
763 Self
764 {
765 sock: None,
766 tap_data: req_tap,
767 cur_tap_type: TapType::None,
768 }
769 );
770 }
771
772 async
773 fn connectlog(&mut self) -> SyRes<()>
774 {
775 let sock =
776 EventLogLocal::new(&self.tap_data)
777 .map_err(|e|
778 map_error_os!(e, "EventRegister error: '{}'", e)
779 )?;
780
781 self.sock = Some(sock);
782 self.cur_tap_type = TapType::WindowsEventLog;
783
784 return Ok(());
785 }
786
787 #[inline]
788 async
789 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
790 {
791 let sock =
792 self
793 .sock
794 .as_ref()
795 .ok_or_else(||
796 std::io::Error::new(ErrorKind::NotConnected, "no connection")
797 )?;
798
799 return sock.send(msg);
800 }
801
802 async
803 fn disconnectlog(&mut self) -> std::io::Result<()>
804 {
805 match self.sock.take()
806 {
807 Some(s) =>
808 {
809 self.cur_tap_type = TapType::None;
810
811 drop(s);
812
813 Ok(())
814 },
815 None =>
816 {
817 self.cur_tap_type = TapType::None;
818
819 Ok(())
820 }
821 }
822 }
823
824 #[inline]
825 fn is_connected(&self) -> bool
826 {
827 return self.sock.is_some();
828 }
829
830 #[inline]
831 fn get_type(&self) -> TapType
832 {
833 return self.cur_tap_type;
834 }
835
836 #[inline]
837 fn get_max_msg_size() -> usize
838 {
839 return WindowsEvent::get_max_msg_len();
840 }
841
842 #[inline]
843 fn update_tap_data(&mut self, tap_data: WindowsEvent)
844 {
845 self.tap_data = tap_data;
846 }
847 }
848}
849