syslog_rs/a_sync/async_tokio/
async_socket.rs1use std::io::ErrorKind;
18use std::net::Shutdown;
19
20use tokio::net::{UnixDatagram};
21use nix::errno::Errno;
22
23use crate::a_sync::{AsyncSyslogTap, AsyncTap};
24
25
26
27use crate::{map_error_os, throw_error_errno, throw_error_os, SyslogDestMsg, SyslogLocal, TapType};
28
29use crate::common::*;
30use crate::error::SyRes;
31
32
33#[cfg(feature = "build_ext_tls")]
34mod with_tls
35{
36 use std::io::ErrorKind;
37
38 use tokio::{io::AsyncWriteExt, net::{TcpSocket, TcpStream}, time};
39 use tokio_rustls::{TlsConnector};
40 use tokio_rustls::client::TlsStream;
41
42 use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error, map_error_os, SyslogDestMsg, SyslogTls, TapType};
43
44 impl AsyncSyslogTap<SyslogTls> for AsyncTap<TlsStream<TcpStream>, SyslogTls>
45 {
46 fn new(req_tap: SyslogTls) -> SyRes<Self>
47 {
48 let ret =
49 Self
50 {
51 sock: None,
52 tap_data: req_tap,
53 cur_tap_type: TapType::NetTcp,
54 };
55
56 return Ok(ret);
57 }
58
59 async
60 fn connectlog(&mut self) -> SyRes<()>
61 {
62
63 let tcp_socket =
64 if self.tap_data.get_remote_addr().is_ipv4() == true
65 {
66 TcpSocket::new_v4()
67 }
68 else
69 {
70 TcpSocket::new_v6()
71 }
72 .map_err(|e|
73 map_error!("cannot create tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
74 )?;
75
76 if self.tap_data.get_bind_addr().ip().is_unspecified() == false
78 {
79 tcp_socket
80 .bind(*self.tap_data.get_bind_addr())
81 .map_err(|e|
82 map_error!("cannot bind tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
83 )?;
84 }
85
86 let socket =
87 if let Some(c_timeout) = self.tap_data.get_get_conn_timeout()
88 {
89 time::timeout(c_timeout, tcp_socket.connect(*self.tap_data.get_remote_addr()))
90 .await
91 .map_err(|e|
92 map_error!("can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
93 )?
94 }
95 else
96 {
97 tcp_socket
98 .connect(*self.tap_data.get_remote_addr())
99 .await
100 }
101 .map_err(|e|
102 map_error_os!(e, "cannot connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
103 )?;
104
105 let connector = TlsConnector::from(self.tap_data.get_client_config());
106
107 let stream =
108 connector
109 .connect(self.tap_data.get_serv_name(), socket)
110 .await
111 .map_err(|e|
112 map_error!("Remote server: '{}', TLS connector error: '{}'", self.tap_data.get_remote_addr(), e)
113 )?;
114
115 self.sock = Some(stream);
117
118 return Ok(());
119 }
120
121 #[inline]
122 async
123 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
124 {
125 let sock =
126 self.sock
127 .as_mut()
128 .ok_or_else(||
129 std::io::Error::new(ErrorKind::NotConnected, "no connection")
130 )?;
131
132 sock.write_all(msg).await?;
133 sock.flush().await?;
134
135 return Ok(msg.len());
136 }
137
138 async
139 fn disconnectlog(&mut self) -> std::io::Result<()>
140 {
141 match self.sock.take()
142 {
143 Some(mut s) =>
144 {
145 s.shutdown().await?;
146
147 drop(s);
148
149 Ok(())
150 },
151 None =>
152 {
153 Ok(())
154 }
155 }
156 }
157
158 #[inline]
159 fn is_connected(&self) -> bool
160 {
161 return self.sock.is_some();
162 }
163
164 #[inline]
165 fn get_type(&self) -> TapType
166 {
167 return self.cur_tap_type;
168 }
169
170 #[inline]
171 fn get_max_msg_size() -> usize
172 {
173 return crate::SyslogTls::get_max_msg_len();
174 }
175
176 #[inline]
177 fn update_tap_data(&mut self, tap_data: SyslogTls)
178 {
179 self.tap_data = tap_data;
180 }
181 }
182}
183
184
185#[cfg(feature = "build_ext_net")]
186mod with_net
187{
188 use std::io::ErrorKind;
189
190 use tokio::{io::AsyncWriteExt, net::{TcpSocket, TcpStream, UdpSocket}, time};
191
192 use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error, map_error_os, SyslogDestMsg, SyslogNetTcp, SyslogNetUdp, TapType};
193
194 impl AsyncSyslogTap<SyslogNetTcp> for AsyncTap<TcpStream, SyslogNetTcp>
195 {
196 fn new(req_tap: SyslogNetTcp) -> SyRes<Self>
197 {
198 let ret =
199 Self
200 {
201 sock: None,
202 tap_data: req_tap,
203 cur_tap_type: TapType::NetTcp,
204 };
205
206 return Ok(ret);
207 }
208
209 async
210 fn connectlog(&mut self) -> SyRes<()>
211 {
212 let tcp_socket =
213 if self.tap_data.get_remote_addr().is_ipv4() == true
214 {
215 TcpSocket::new_v4()
216 }
217 else
218 {
219 TcpSocket::new_v6()
220 }
221 .map_err(|e|
222 map_error!("cannot create tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
223 )?;
224
225 if self.tap_data.get_bind_addr().ip().is_unspecified() == false
227 {
228 tcp_socket
229 .bind(*self.tap_data.get_bind_addr())
230 .map_err(|e|
231 map_error!("cannot bind tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
232 )?;
233 }
234
235 let socket =
236 if let Some(c_timeout) = self.tap_data.get_conn_timeout()
237 {
238 time::timeout(c_timeout, tcp_socket.connect(*self.tap_data.get_remote_addr()))
239 .await
240 .map_err(|e|
241 map_error!("can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
242 )?
243 }
244 else
245 {
246 tcp_socket
247 .connect(*self.tap_data.get_remote_addr())
248 .await
249 }
250 .map_err(|e|
251 map_error_os!(e, "can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
252 )?;
253
254
255 self.sock = Some(socket);
257
258 return Ok(());
259 }
260
261 #[inline]
262 async
263 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
264 {
265 let sock =
266 self.sock
267 .as_mut()
268 .ok_or_else(||
269 std::io::Error::new(ErrorKind::NotConnected, "no connection")
270 )?;
271
272 sock.write_all(msg).await?;
273 sock.flush().await?;
274
275 return Ok(msg.len());
276 }
277
278 async
279 fn disconnectlog(&mut self) -> std::io::Result<()>
280 {
281 match self.sock.take()
282 {
283 Some(mut s) =>
284 {
285 s.shutdown().await?;
286
287 drop(s);
288
289 Ok(())
290 },
291 None =>
292 {
293 Ok(())
294 }
295 }
296 }
297
298 #[inline]
299 fn is_connected(&self) -> bool
300 {
301 return self.sock.is_some();
302 }
303
304 #[inline]
305 fn get_type(&self) -> TapType
306 {
307 return self.cur_tap_type;
308 }
309
310 #[inline]
311 fn get_max_msg_size() -> usize
312 {
313 return crate::SyslogNetTcp::get_max_msg_len();
314 }
315
316 #[inline]
317 fn update_tap_data(&mut self, tap_data: SyslogNetTcp)
318 {
319 self.tap_data = tap_data;
320 }
321 }
322
323 impl AsyncSyslogTap<SyslogNetUdp> for AsyncTap<UdpSocket, SyslogNetUdp>
324 {
325 fn new(req_tap: SyslogNetUdp) -> SyRes<Self>
326 {
327 let ret =
328 Self
329 {
330 sock: None,
331 tap_data: req_tap,
332 cur_tap_type: TapType::NetUdp,
333 };
334
335 return Ok(ret);
336 }
337
338 async
339 fn connectlog(&mut self) -> SyRes<()>
340 {
341 let socket =
342 UdpSocket
343 ::bind(self.tap_data.get_bind_addr())
344 .await
345 .map_err(|e|
346 map_error_os!(e, "can not bind udp '{}', error: {}", self.tap_data.get_bind_addr(), e)
347 )?;
348
349 socket
350 .connect(self.tap_data.get_remote_addr())
351 .await
352 .map_err(|e|
353 map_error_os!(e, "can not connect to udp '{}', error: {}", self.tap_data.get_remote_addr(), e)
354 )?;
355
356 self.sock = Some(socket);
358
359 return Ok(());
360 }
361
362 #[inline]
363 async
364 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
365 {
366 let sock =
367 self
368 .sock
369 .as_mut()
370 .ok_or_else(||
371 std::io::Error::new(ErrorKind::NotConnected, "no connection")
372 )?;
373
374 return sock.send(msg).await;
375 }
376
377 async
378 fn disconnectlog(&mut self) -> std::io::Result<()>
379 {
380 match self.sock.take()
381 {
382 Some(s) =>
383 {
384 drop(s);
385
386 Ok(())
387 },
388 None =>
389 {
390 Ok(())
391 }
392 }
393 }
394
395 #[inline]
396 fn is_connected(&self) -> bool
397 {
398 return self.sock.is_some();
399 }
400
401 #[inline]
402 fn get_type(&self) -> TapType
403 {
404 return self.cur_tap_type;
405 }
406
407 #[inline]
408 fn get_max_msg_size() -> usize
409 {
410 return crate::SyslogNetUdp::get_max_msg_len();
411 }
412
413 #[inline]
414 fn update_tap_data(&mut self, tap_data: SyslogNetUdp)
415 {
416 self.tap_data = tap_data;
417 }
418 }
419}
420
421
422impl AsyncSyslogTap<SyslogLocal> for AsyncTap<UnixDatagram, SyslogLocal>
423{
424
425 fn new(req_tap: SyslogLocal) -> SyRes<Self>
426 {
427 return Ok(
428 Self
429 {
430 sock: None,
431 tap_data: req_tap,
432 cur_tap_type: TapType::None,
433 }
434 );
435 }
436
437
438 async
439 fn connectlog(&mut self) -> SyRes<()>
440 {
441 let sock =
442 UnixDatagram
443 ::unbound()
444 .map_err(|e|
445 map_error_os!(e, "unbounded unix datagram initialization failure: {}", e)
446 )?;
447
448 let tap_type =
449 if self.tap_data.get_use_alternative() == false && self.tap_data.get_custom_remote_path().is_some() == true
450 {
451 if let Err(e) = sock.connect(self.tap_data.get_custom_remote_path().as_ref().unwrap())
452 {
453 throw_error_os!(e, "failed to open connection to syslog server at '{}'",
454 self.tap_data.get_custom_remote_path().as_ref().unwrap().display());
455 }
456 else
457 {
458 TapType::CustomLog
459 }
460 }
461 else if self.tap_data.get_custom_remote_path().is_some() == true &&
462 sock.connect(self.tap_data.get_custom_remote_path().as_ref().unwrap()).is_ok() == true
463 {
464 TapType::CustomLog
465 }
466 else if let Ok(_) = sock.connect(PATH_LOG_PRIV)
467 {
468 TapType::Priv
469 }
470 else if let Ok(_) = sock.connect(PATH_LOG)
471 {
472 TapType::UnPriv
473 }
474 else if let Ok(_) = sock.connect(PATH_OLDLOG)
475 {
476 TapType::OldLog
477 }
478 else if let Ok(_) = sock.connect(PATH_OSX)
479 {
480 TapType::Priv
481 }
482 else
483 {
484 throw_error_errno!(Errno::last(), "failed to open connection to syslog server");
486 };
487
488 #[cfg(any(
489 target_os = "freebsd",
490 target_os = "dragonfly",
491 target_os = "openbsd",
492 target_os = "netbsd",
493 target_os = "macos"
494 ))]
495 {
496 use std::os::fd::AsRawFd;
497
498 let mut len: std::mem::MaybeUninit<nix::libc::socklen_t> = std::mem::MaybeUninit::uninit();
500
501 let res =
503 unsafe
504 {
505 nix::libc::getsockopt(
506 sock.as_raw_fd(),
507 nix::libc::SOL_SOCKET,
508 nix::libc::SO_SNDBUF,
509 len.as_mut_ptr() as *mut nix::libc::c_void,
510 &mut {
511 std::mem::size_of::<nix::libc::socklen_t>() as nix::libc::socklen_t
512 }
513 )
514 };
515
516 if res == 0
517 {
518 let mut len = unsafe { len.assume_init() } as usize;
519
520 if len < MAXLINE
521 {
522 len = MAXLINE;
523
524 unsafe {
525 nix::libc::setsockopt(
526 sock.as_raw_fd(),
527 nix::libc::SOL_SOCKET,
528 nix::libc::SO_SNDBUF,
529 &len as *const _ as *const nix::libc::c_void,
530 std::mem::size_of::<nix::libc::socklen_t>() as nix::libc::socklen_t
531 )
532 };
533 }
534 }
535 }
536
537 sock
538 .shutdown(std::net::Shutdown::Read)
539 .map_err(|e|
540 map_error_os!(e, "can not shutdown read portion, error: '{}'", e)
541 )?;
542
543 self.sock = Some(sock);
544 self.cur_tap_type = tap_type;
545
546 return Ok(());
547 }
548
549 #[inline]
550 async
551 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
552 {
553 let sock =
554 self.sock
555 .as_mut()
556 .ok_or_else(||
557 std::io::Error::new(ErrorKind::NotConnected, "no connection")
558 )?;
559
560 return sock.send(msg).await;
561 }
562
563 async
564 fn disconnectlog(&mut self) -> std::io::Result<()>
565 {
566 match self.sock.take()
567 {
568 Some(s) =>
569 {
570 self.cur_tap_type = TapType::None;
571
572 s.shutdown(Shutdown::Both)
573 },
574 None =>
575 {
576 self.cur_tap_type = TapType::None;
577
578 Ok(())
579 }
580 }
581 }
582
583 #[inline]
584 fn is_connected(&self) -> bool
585 {
586 return self.sock.is_some();
587 }
588
589 #[inline]
590 fn get_type(&self) -> TapType
591 {
592 return self.cur_tap_type;
593 }
594
595 #[inline]
596 fn get_max_msg_size() -> usize
597 {
598 return crate::SyslogLocal::get_max_msg_len();
599 }
600
601 #[inline]
602 fn update_tap_data(&mut self, tap_data: SyslogLocal)
603 {
604 self.tap_data = tap_data;
605 }
606}
607
608
609
610#[cfg(feature = "build_ext_file")]
611mod with_file
612{
613 use std::io::ErrorKind;
614
615 use tokio::fs::File;
616
617 use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error_os, SyslogDestMsg, SyslogFile, TapType};
618
619 impl AsyncSyslogTap<SyslogFile> for AsyncTap<File, SyslogFile>
620 {
621 fn new(req_tap: SyslogFile) -> SyRes<Self>
622 {
623 let ret =
624 Self
625 {
626 sock: None,
627 tap_data: req_tap,
628 cur_tap_type: TapType::LocalFile,
629 };
630
631 return Ok(ret);
632 }
633
634 async
635 fn connectlog(&mut self) -> SyRes<()>
636 {
637
638 let file =
639 File::options()
640 .append(true)
641 .read(true)
642 .write(true)
643 .create(true)
644 .open(self.tap_data.get_path())
645 .await
646 .map_err(|e|
647 map_error_os!(e, "can not open file '{}' to write, error: '{}'",
648 self.tap_data.get_path().display(), e)
649 )?;
650
651 self.sock = Some(file);
652
653 return Ok(());
654 }
655
656 #[inline]
657 async
658 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
659 {
660 use crate::tokio::io::AsyncWriteExt;
661
662 let sock =
663 self
664 .sock
665 .as_mut()
666 .ok_or_else(||
667 std::io::Error::new(ErrorKind::NotConnected, "no connection")
668 )?;
669
670 sock.write_all(msg).await?;
671
672 return Ok(msg.len());
673 }
674
675 async
676 fn disconnectlog(&mut self) -> std::io::Result<()>
677 {
678 match self.sock.take()
679 {
680 Some(s) =>
681 {
682 self.cur_tap_type = TapType::None;
683
684 s.sync_data().await?;
685
686 drop(s);
687
688 return Ok(());
689 },
690 None =>
691 {
692 self.cur_tap_type = TapType::None;
693
694 return Ok(())
695 }
696 }
697 }
698
699 #[inline]
700 fn is_connected(&self) -> bool
701 {
702 return self.sock.is_some();
703 }
704
705 #[inline]
706 fn get_type(&self) -> TapType
707 {
708 return self.cur_tap_type;
709 }
710
711 #[inline]
712 fn get_max_msg_size() -> usize
713 {
714 return crate::SyslogFile::get_max_msg_len();
715 }
716
717 #[inline]
718 fn update_tap_data(&mut self, tap_data: SyslogFile)
719 {
720 self.tap_data = tap_data;
721 }
722 }
723}
724