syslog_rs/a_sync/async_tokio/
async_socket.rs1use std::io::ErrorKind;
16use std::net::Shutdown;
17
18use tokio::net::{UnixDatagram};
19use nix::errno::Errno;
20
21use crate::a_sync::{AsyncSyslogTap, AsyncTap};
22
23
24
25use crate::{map_error_os, throw_error_errno, throw_error_os, SyslogDestMsg, SyslogLocal, TapType};
26
27use crate::common::*;
28use crate::error::SyRes;
29
30
31#[cfg(feature = "build_ext_tls")]
32mod with_tls
33{
34 use std::io::ErrorKind;
35
36 use tokio::{io::AsyncWriteExt, net::{TcpSocket, TcpStream}, time};
37 use tokio_rustls::{TlsConnector};
38 use tokio_rustls::client::TlsStream;
39
40 use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error, map_error_os, SyslogDestMsg, SyslogTls, TapType};
41
42 impl AsyncSyslogTap<SyslogTls> for AsyncTap<TlsStream<TcpStream>, SyslogTls>
43 {
44 fn new(req_tap: SyslogTls) -> SyRes<Self>
45 {
46 let ret =
47 Self
48 {
49 sock: None,
50 tap_data: req_tap,
51 cur_tap_type: TapType::NetTcp,
52 };
53
54 return Ok(ret);
55 }
56
57 async
58 fn connectlog(&mut self) -> SyRes<()>
59 {
60
61 let tcp_socket =
62 if self.tap_data.get_remote_addr().is_ipv4() == true
63 {
64 TcpSocket::new_v4()
65 }
66 else
67 {
68 TcpSocket::new_v6()
69 }
70 .map_err(|e|
71 map_error!("cannot create tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
72 )?;
73
74 if self.tap_data.get_bind_addr().ip().is_unspecified() == false
76 {
77 tcp_socket
78 .bind(*self.tap_data.get_bind_addr())
79 .map_err(|e|
80 map_error!("cannot bind tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
81 )?;
82 }
83
84 let socket =
85 if let Some(c_timeout) = self.tap_data.get_get_conn_timeout()
86 {
87 time::timeout(c_timeout, tcp_socket.connect(*self.tap_data.get_remote_addr()))
88 .await
89 .map_err(|e|
90 map_error!("can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
91 )?
92 }
93 else
94 {
95 tcp_socket
96 .connect(*self.tap_data.get_remote_addr())
97 .await
98 }
99 .map_err(|e|
100 map_error_os!(e, "cannot connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
101 )?;
102
103 let connector = TlsConnector::from(self.tap_data.get_client_config());
104
105 let stream =
106 connector
107 .connect(self.tap_data.get_serv_name(), socket)
108 .await
109 .map_err(|e|
110 map_error!("Remote server: '{}', TLS connector error: '{}'", self.tap_data.get_remote_addr(), e)
111 )?;
112
113 self.sock = Some(stream);
115
116 return Ok(());
117 }
118
119 #[inline]
120 async
121 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
122 {
123 let sock =
124 self.sock
125 .as_mut()
126 .ok_or_else(||
127 std::io::Error::new(ErrorKind::NotConnected, "no connection")
128 )?;
129
130 sock.write_all(msg).await?;
131 sock.flush().await?;
132
133 return Ok(msg.len());
134 }
135
136 async
137 fn disconnectlog(&mut self) -> std::io::Result<()>
138 {
139 match self.sock.take()
140 {
141 Some(mut s) =>
142 {
143 s.shutdown().await?;
144
145 drop(s);
146
147 Ok(())
148 },
149 None =>
150 {
151 Ok(())
152 }
153 }
154 }
155
156 #[inline]
157 fn is_connected(&self) -> bool
158 {
159 return self.sock.is_some();
160 }
161
162 #[inline]
163 fn get_type(&self) -> TapType
164 {
165 return self.cur_tap_type;
166 }
167
168 #[inline]
169 fn get_max_msg_size() -> usize
170 {
171 return crate::SyslogTls::get_max_msg_len();
172 }
173
174 #[inline]
175 fn update_tap_data(&mut self, tap_data: SyslogTls)
176 {
177 self.tap_data = tap_data;
178 }
179 }
180}
181
182
183#[cfg(feature = "build_ext_net")]
184mod with_net
185{
186 use std::io::ErrorKind;
187
188 use tokio::{io::AsyncWriteExt, net::{TcpSocket, TcpStream, UdpSocket}, time};
189
190 use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error, map_error_os, SyslogDestMsg, SyslogNetTcp, SyslogNetUdp, TapType};
191
192 impl AsyncSyslogTap<SyslogNetTcp> for AsyncTap<TcpStream, SyslogNetTcp>
193 {
194 fn new(req_tap: SyslogNetTcp) -> SyRes<Self>
195 {
196 let ret =
197 Self
198 {
199 sock: None,
200 tap_data: req_tap,
201 cur_tap_type: TapType::NetTcp,
202 };
203
204 return Ok(ret);
205 }
206
207 async
208 fn connectlog(&mut self) -> SyRes<()>
209 {
210 let tcp_socket =
211 if self.tap_data.get_remote_addr().is_ipv4() == true
212 {
213 TcpSocket::new_v4()
214 }
215 else
216 {
217 TcpSocket::new_v6()
218 }
219 .map_err(|e|
220 map_error!("cannot create tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
221 )?;
222
223 if self.tap_data.get_bind_addr().ip().is_unspecified() == false
225 {
226 tcp_socket
227 .bind(*self.tap_data.get_bind_addr())
228 .map_err(|e|
229 map_error!("cannot bind tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
230 )?;
231 }
232
233 let socket =
234 if let Some(c_timeout) = self.tap_data.get_conn_timeout()
235 {
236 time::timeout(c_timeout, tcp_socket.connect(*self.tap_data.get_remote_addr()))
237 .await
238 .map_err(|e|
239 map_error!("can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
240 )?
241 }
242 else
243 {
244 tcp_socket
245 .connect(*self.tap_data.get_remote_addr())
246 .await
247 }
248 .map_err(|e|
249 map_error_os!(e, "can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
250 )?;
251
252
253 self.sock = Some(socket);
255
256 return Ok(());
257 }
258
259 #[inline]
260 async
261 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
262 {
263 let sock =
264 self.sock
265 .as_mut()
266 .ok_or_else(||
267 std::io::Error::new(ErrorKind::NotConnected, "no connection")
268 )?;
269
270 sock.write_all(msg).await?;
271 sock.flush().await?;
272
273 return Ok(msg.len());
274 }
275
276 async
277 fn disconnectlog(&mut self) -> std::io::Result<()>
278 {
279 match self.sock.take()
280 {
281 Some(mut s) =>
282 {
283 s.shutdown().await?;
284
285 drop(s);
286
287 Ok(())
288 },
289 None =>
290 {
291 Ok(())
292 }
293 }
294 }
295
296 #[inline]
297 fn is_connected(&self) -> bool
298 {
299 return self.sock.is_some();
300 }
301
302 #[inline]
303 fn get_type(&self) -> TapType
304 {
305 return self.cur_tap_type;
306 }
307
308 #[inline]
309 fn get_max_msg_size() -> usize
310 {
311 return crate::SyslogNetTcp::get_max_msg_len();
312 }
313
314 #[inline]
315 fn update_tap_data(&mut self, tap_data: SyslogNetTcp)
316 {
317 self.tap_data = tap_data;
318 }
319 }
320
321 impl AsyncSyslogTap<SyslogNetUdp> for AsyncTap<UdpSocket, SyslogNetUdp>
322 {
323 fn new(req_tap: SyslogNetUdp) -> SyRes<Self>
324 {
325 let ret =
326 Self
327 {
328 sock: None,
329 tap_data: req_tap,
330 cur_tap_type: TapType::NetUdp,
331 };
332
333 return Ok(ret);
334 }
335
336 async
337 fn connectlog(&mut self) -> SyRes<()>
338 {
339 let socket =
340 UdpSocket
341 ::bind(self.tap_data.get_bind_addr())
342 .await
343 .map_err(|e|
344 map_error_os!(e, "can not bind udp '{}', error: {}", self.tap_data.get_bind_addr(), e)
345 )?;
346
347 socket
348 .connect(self.tap_data.get_remote_addr())
349 .await
350 .map_err(|e|
351 map_error_os!(e, "can not connect to udp '{}', error: {}", self.tap_data.get_remote_addr(), e)
352 )?;
353
354 self.sock = Some(socket);
356
357 return Ok(());
358 }
359
360 #[inline]
361 async
362 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
363 {
364 let sock =
365 self
366 .sock
367 .as_mut()
368 .ok_or_else(||
369 std::io::Error::new(ErrorKind::NotConnected, "no connection")
370 )?;
371
372 return sock.send(msg).await;
373 }
374
375 async
376 fn disconnectlog(&mut self) -> std::io::Result<()>
377 {
378 match self.sock.take()
379 {
380 Some(s) =>
381 {
382 drop(s);
383
384 Ok(())
385 },
386 None =>
387 {
388 Ok(())
389 }
390 }
391 }
392
393 #[inline]
394 fn is_connected(&self) -> bool
395 {
396 return self.sock.is_some();
397 }
398
399 #[inline]
400 fn get_type(&self) -> TapType
401 {
402 return self.cur_tap_type;
403 }
404
405 #[inline]
406 fn get_max_msg_size() -> usize
407 {
408 return crate::SyslogNetUdp::get_max_msg_len();
409 }
410
411 #[inline]
412 fn update_tap_data(&mut self, tap_data: SyslogNetUdp)
413 {
414 self.tap_data = tap_data;
415 }
416 }
417}
418
419
420impl AsyncSyslogTap<SyslogLocal> for AsyncTap<UnixDatagram, SyslogLocal>
421{
422
423 fn new(req_tap: SyslogLocal) -> SyRes<Self>
424 {
425 return Ok(
426 Self
427 {
428 sock: None,
429 tap_data: req_tap,
430 cur_tap_type: TapType::None,
431 }
432 );
433 }
434
435
436 async
437 fn connectlog(&mut self) -> SyRes<()>
438 {
439 let sock =
440 UnixDatagram
441 ::unbound()
442 .map_err(|e|
443 map_error_os!(e, "unbounded unix datagram initialization failure: {}", e)
444 )?;
445
446 let tap_type =
447 if self.tap_data.get_use_alternative() == false && self.tap_data.get_custom_remote_path().is_some() == true
448 {
449 if let Err(e) = sock.connect(self.tap_data.get_custom_remote_path().as_ref().unwrap())
450 {
451 throw_error_os!(e, "failed to open connection to syslog server at '{}'",
452 self.tap_data.get_custom_remote_path().as_ref().unwrap().display());
453 }
454 else
455 {
456 TapType::CustomLog
457 }
458 }
459 else if self.tap_data.get_custom_remote_path().is_some() == true &&
460 sock.connect(self.tap_data.get_custom_remote_path().as_ref().unwrap()).is_ok() == true
461 {
462 TapType::CustomLog
463 }
464 else if let Ok(_) = sock.connect(PATH_LOG_PRIV)
465 {
466 TapType::Priv
467 }
468 else if let Ok(_) = sock.connect(PATH_LOG)
469 {
470 TapType::UnPriv
471 }
472 else if let Ok(_) = sock.connect(PATH_OLDLOG)
473 {
474 TapType::OldLog
475 }
476 else if let Ok(_) = sock.connect(PATH_OSX)
477 {
478 TapType::Priv
479 }
480 else
481 {
482 throw_error_errno!(Errno::last(), "failed to open connection to syslog server");
484 };
485
486 #[cfg(any(
487 target_os = "freebsd",
488 target_os = "dragonfly",
489 target_os = "openbsd",
490 target_os = "netbsd",
491 target_os = "macos"
492 ))]
493 {
494 use std::os::fd::AsRawFd;
495
496 let mut len: std::mem::MaybeUninit<nix::libc::socklen_t> = std::mem::MaybeUninit::uninit();
498
499 let res =
501 unsafe
502 {
503 nix::libc::getsockopt(
504 sock.as_raw_fd(),
505 nix::libc::SOL_SOCKET,
506 nix::libc::SO_SNDBUF,
507 len.as_mut_ptr() as *mut nix::libc::c_void,
508 &mut {
509 std::mem::size_of::<nix::libc::socklen_t>() as nix::libc::socklen_t
510 }
511 )
512 };
513
514 if res == 0
515 {
516 let mut len = unsafe { len.assume_init() } as usize;
517
518 if len < MAXLINE
519 {
520 len = MAXLINE;
521
522 unsafe {
523 nix::libc::setsockopt(
524 sock.as_raw_fd(),
525 nix::libc::SOL_SOCKET,
526 nix::libc::SO_SNDBUF,
527 &len as *const _ as *const nix::libc::c_void,
528 std::mem::size_of::<nix::libc::socklen_t>() as nix::libc::socklen_t
529 )
530 };
531 }
532 }
533 }
534
535 sock
536 .shutdown(std::net::Shutdown::Read)
537 .map_err(|e|
538 map_error_os!(e, "can not shutdown read portion, error: '{}'", e)
539 )?;
540
541 self.sock = Some(sock);
542 self.cur_tap_type = tap_type;
543
544 return Ok(());
545 }
546
547 #[inline]
548 async
549 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
550 {
551 let sock =
552 self.sock
553 .as_mut()
554 .ok_or_else(||
555 std::io::Error::new(ErrorKind::NotConnected, "no connection")
556 )?;
557
558 return sock.send(msg).await;
559 }
560
561 async
562 fn disconnectlog(&mut self) -> std::io::Result<()>
563 {
564 match self.sock.take()
565 {
566 Some(s) =>
567 {
568 self.cur_tap_type = TapType::None;
569
570 s.shutdown(Shutdown::Both)
571 },
572 None =>
573 {
574 self.cur_tap_type = TapType::None;
575
576 Ok(())
577 }
578 }
579 }
580
581 #[inline]
582 fn is_connected(&self) -> bool
583 {
584 return self.sock.is_some();
585 }
586
587 #[inline]
588 fn get_type(&self) -> TapType
589 {
590 return self.cur_tap_type;
591 }
592
593 #[inline]
594 fn get_max_msg_size() -> usize
595 {
596 return crate::SyslogLocal::get_max_msg_len();
597 }
598
599 #[inline]
600 fn update_tap_data(&mut self, tap_data: SyslogLocal)
601 {
602 self.tap_data = tap_data;
603 }
604}
605
606
607
608#[cfg(feature = "build_ext_file")]
609mod with_file
610{
611 use std::io::ErrorKind;
612
613 use tokio::fs::File;
614
615 use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error_os, SyslogDestMsg, SyslogFile, TapType};
616
617 impl AsyncSyslogTap<SyslogFile> for AsyncTap<File, SyslogFile>
618 {
619 fn new(req_tap: SyslogFile) -> SyRes<Self>
620 {
621 let ret =
622 Self
623 {
624 sock: None,
625 tap_data: req_tap,
626 cur_tap_type: TapType::LocalFile,
627 };
628
629 return Ok(ret);
630 }
631
632 async
633 fn connectlog(&mut self) -> SyRes<()>
634 {
635
636 let file =
637 File::options()
638 .append(true)
639 .read(true)
640 .write(true)
641 .create(true)
642 .open(self.tap_data.get_path())
643 .await
644 .map_err(|e|
645 map_error_os!(e, "can not open file '{}' to write, error: '{}'",
646 self.tap_data.get_path().display(), e)
647 )?;
648
649 self.sock = Some(file);
650
651 return Ok(());
652 }
653
654 #[inline]
655 async
656 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
657 {
658 use crate::tokio::io::AsyncWriteExt;
659
660 let sock =
661 self
662 .sock
663 .as_mut()
664 .ok_or_else(||
665 std::io::Error::new(ErrorKind::NotConnected, "no connection")
666 )?;
667
668 sock.write_all(msg).await?;
669
670 return Ok(msg.len());
671 }
672
673 async
674 fn disconnectlog(&mut self) -> std::io::Result<()>
675 {
676 match self.sock.take()
677 {
678 Some(s) =>
679 {
680 self.cur_tap_type = TapType::None;
681
682 s.sync_data().await?;
683
684 drop(s);
685
686 return Ok(());
687 },
688 None =>
689 {
690 self.cur_tap_type = TapType::None;
691
692 return Ok(())
693 }
694 }
695 }
696
697 #[inline]
698 fn is_connected(&self) -> bool
699 {
700 return self.sock.is_some();
701 }
702
703 #[inline]
704 fn get_type(&self) -> TapType
705 {
706 return self.cur_tap_type;
707 }
708
709 #[inline]
710 fn get_max_msg_size() -> usize
711 {
712 return crate::SyslogFile::get_max_msg_len();
713 }
714
715 #[inline]
716 fn update_tap_data(&mut self, tap_data: SyslogFile)
717 {
718 self.tap_data = tap_data;
719 }
720 }
721}
722