syslog_rs/a_sync/async_tokio/
async_socket.rs1use std::io::ErrorKind;
16use std::net::Shutdown;
17
18use tokio::net::{UnixDatagram};
19use nix::errno::Errno;
20
21use crate::a_sync::{AsyncSyslogTap, AsyncTap};
22
23
24
25use crate::{map_error_os, throw_error_errno, throw_error_os, SyslogDestMsg, SyslogLocal, TapType};
26
27use crate::common::*;
28use crate::error::SyRes;
29
30
31#[cfg(feature = "build_ext_tls")]
32mod with_tls
33{
34 use std::io::ErrorKind;
35
36 use tokio::{io::AsyncWriteExt, net::{TcpSocket, TcpStream}, time};
37 use tokio_rustls::{TlsConnector};
38 use tokio_rustls::client::TlsStream;
39
40 use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error, map_error_os, SyslogDestMsg, SyslogTls, TapType};
41
42 impl AsyncSyslogTap<SyslogTls> for AsyncTap<TlsStream<TcpStream>, SyslogTls>
43 {
44 fn new(req_tap: SyslogTls) -> SyRes<Self>
45 {
46 let ret =
47 Self
48 {
49 sock: None,
50 tap_data: req_tap,
51 cur_tap_type: TapType::NetTcp,
52 };
53
54 return Ok(ret);
55 }
56
57 async
58 fn connectlog(&mut self) -> SyRes<()>
59 {
60
61 let tcp_socket =
62 if self.tap_data.get_remote_addr().is_ipv4() == true
63 {
64 TcpSocket::new_v4()
65 }
66 else
67 {
68 TcpSocket::new_v6()
69 }
70 .map_err(|e|
71 map_error!("cannot create tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
72 )?;
73
74 if self.tap_data.get_bind_addr().ip().is_unspecified() == false
76 {
77 tcp_socket
78 .bind(*self.tap_data.get_bind_addr())
79 .map_err(|e|
80 map_error!("cannot bind tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
81 )?;
82 }
83
84 let socket =
85 if let Some(c_timeout) = self.tap_data.get_get_conn_timeout()
86 {
87 time::timeout(c_timeout, tcp_socket.connect(*self.tap_data.get_remote_addr()))
88 .await
89 .map_err(|e|
90 map_error!("can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
91 )?
92 }
93 else
94 {
95 tcp_socket
96 .connect(*self.tap_data.get_remote_addr())
97 .await
98 }
99 .map_err(|e|
100 map_error_os!(e, "cannot connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
101 )?;
102
103 let connector = TlsConnector::from(self.tap_data.get_client_config());
104
105 let stream =
106 connector
107 .connect(self.tap_data.get_serv_name(), socket)
108 .await
109 .map_err(|e|
110 map_error!("Remote server: '{}', TLS connector error: '{}'", self.tap_data.get_remote_addr(), e)
111 )?;
112
113 self.sock = Some(stream);
115
116 return Ok(());
117 }
118
119 async
120 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
121 {
122 let sock =
123 self.sock
124 .as_mut()
125 .ok_or_else(||
126 std::io::Error::new(ErrorKind::NotConnected, "no connection")
127 )?;
128
129 sock.write_all(msg).await?;
130 sock.flush().await?;
131
132 return Ok(msg.len());
133 }
134
135 async
136 fn disconnectlog(&mut self) -> std::io::Result<()>
137 {
138 match self.sock.take()
139 {
140 Some(mut s) =>
141 {
142 s.shutdown().await?;
143
144 drop(s);
145
146 Ok(())
147 },
148 None =>
149 {
150 Ok(())
151 }
152 }
153 }
154
155 fn is_connected(&self) -> bool
156 {
157 return self.sock.is_some();
158 }
159
160 fn get_type(&self) -> TapType
161 {
162 return self.cur_tap_type;
163 }
164
165 fn get_max_msg_size(&self) -> usize
166 {
167 return self.tap_data.get_max_msg_len();
168 }
169
170 fn update_tap_data(&mut self, tap_data: SyslogTls)
171 {
172 self.tap_data = tap_data;
173 }
174 }
175}
176
177
178#[cfg(feature = "build_ext_net")]
179mod with_net
180{
181 use std::io::ErrorKind;
182
183 use tokio::{io::AsyncWriteExt, net::{TcpSocket, TcpStream, UdpSocket}, time};
184
185 use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error, map_error_os, SyslogDestMsg, SyslogNetTcp, SyslogNetUdp, TapType};
186
187 impl AsyncSyslogTap<SyslogNetTcp> for AsyncTap<TcpStream, SyslogNetTcp>
188 {
189 fn new(req_tap: SyslogNetTcp) -> SyRes<Self>
190 {
191 let ret =
192 Self
193 {
194 sock: None,
195 tap_data: req_tap,
196 cur_tap_type: TapType::NetTcp,
197 };
198
199 return Ok(ret);
200 }
201
202 async
203 fn connectlog(&mut self) -> SyRes<()>
204 {
205 let tcp_socket =
206 if self.tap_data.get_remote_addr().is_ipv4() == true
207 {
208 TcpSocket::new_v4()
209 }
210 else
211 {
212 TcpSocket::new_v6()
213 }
214 .map_err(|e|
215 map_error!("cannot create tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
216 )?;
217
218 if self.tap_data.get_bind_addr().ip().is_unspecified() == false
220 {
221 tcp_socket
222 .bind(*self.tap_data.get_bind_addr())
223 .map_err(|e|
224 map_error!("cannot bind tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
225 )?;
226 }
227
228 let socket =
229 if let Some(c_timeout) = self.tap_data.get_conn_timeout()
230 {
231 time::timeout(c_timeout, tcp_socket.connect(*self.tap_data.get_remote_addr()))
232 .await
233 .map_err(|e|
234 map_error!("can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
235 )?
236 }
237 else
238 {
239 tcp_socket
240 .connect(*self.tap_data.get_remote_addr())
241 .await
242 }
243 .map_err(|e|
244 map_error_os!(e, "can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
245 )?;
246
247 self.sock = Some(socket);
249
250 return Ok(());
251 }
252
253 async
254 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
255 {
256 let sock =
257 self.sock
258 .as_mut()
259 .ok_or_else(||
260 std::io::Error::new(ErrorKind::NotConnected, "no connection")
261 )?;
262
263 sock.write_all(msg).await?;
264 sock.flush().await?;
265
266 return Ok(msg.len());
267 }
268
269 async
270 fn disconnectlog(&mut self) -> std::io::Result<()>
271 {
272 match self.sock.take()
273 {
274 Some(mut s) =>
275 {
276 s.shutdown().await?;
277
278 drop(s);
279
280 Ok(())
281 },
282 None =>
283 {
284 Ok(())
285 }
286 }
287 }
288
289 fn is_connected(&self) -> bool
290 {
291 return self.sock.is_some();
292 }
293
294 fn get_type(&self) -> TapType
295 {
296 return self.cur_tap_type;
297 }
298
299 fn get_max_msg_size(&self) -> usize
300 {
301 return self.tap_data.get_max_msg_len();
302 }
303
304 fn update_tap_data(&mut self, tap_data: SyslogNetTcp)
305 {
306 self.tap_data = tap_data;
307 }
308 }
309
310 impl AsyncSyslogTap<SyslogNetUdp> for AsyncTap<UdpSocket, SyslogNetUdp>
311 {
312 fn new(req_tap: SyslogNetUdp) -> SyRes<Self>
313 {
314 let ret =
315 Self
316 {
317 sock: None,
318 tap_data: req_tap,
319 cur_tap_type: TapType::NetUdp,
320 };
321
322 return Ok(ret);
323 }
324
325 async
326 fn connectlog(&mut self) -> SyRes<()>
327 {
328 let socket =
329 UdpSocket
330 ::bind(self.tap_data.get_bind_addr())
331 .await
332 .map_err(|e|
333 map_error_os!(e, "can not bind udp '{}', error: {}", self.tap_data.get_bind_addr(), e)
334 )?;
335
336 socket
337 .connect(self.tap_data.get_remote_addr())
338 .await
339 .map_err(|e|
340 map_error_os!(e, "can not connect to udp '{}', error: {}", self.tap_data.get_remote_addr(), e)
341 )?;
342
343 self.sock = Some(socket);
345
346 return Ok(());
347 }
348
349 async
350 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
351 {
352 let sock =
353 self
354 .sock
355 .as_mut()
356 .ok_or_else(||
357 std::io::Error::new(ErrorKind::NotConnected, "no connection")
358 )?;
359
360 return sock.send(msg).await;
361 }
362
363 async
364 fn disconnectlog(&mut self) -> std::io::Result<()>
365 {
366 match self.sock.take()
367 {
368 Some(s) =>
369 {
370 drop(s);
371
372 Ok(())
373 },
374 None =>
375 {
376 Ok(())
377 }
378 }
379 }
380
381 fn is_connected(&self) -> bool
382 {
383 return self.sock.is_some();
384 }
385
386 fn get_type(&self) -> TapType
387 {
388 return self.cur_tap_type;
389 }
390
391 fn get_max_msg_size(&self) -> usize
392 {
393 return self.tap_data.get_max_msg_len();
394 }
395
396 fn update_tap_data(&mut self, tap_data: SyslogNetUdp)
397 {
398 self.tap_data = tap_data;
399 }
400 }
401}
402
403
404impl AsyncSyslogTap<SyslogLocal> for AsyncTap<UnixDatagram, SyslogLocal>
405{
406
407 fn new(req_tap: SyslogLocal) -> SyRes<Self>
408 {
409 return Ok(
410 Self
411 {
412 sock: None,
413 tap_data: req_tap,
414 cur_tap_type: TapType::None,
415 }
416 );
417 }
418
419
420 async
421 fn connectlog(&mut self) -> SyRes<()>
422 {
423 let sock =
424 UnixDatagram
425 ::unbound()
426 .map_err(|e|
427 map_error_os!(e, "unbounded unix datagram initialization failure: {}", e)
428 )?;
429
430 let tap_type =
431 if self.tap_data.get_use_alternative() == false && self.tap_data.get_custom_remote_path().is_some() == true
432 {
433 if let Err(e) = sock.connect(self.tap_data.get_custom_remote_path().as_ref().unwrap())
434 {
435 throw_error_os!(e, "failed to open connection to syslog server at '{}'",
436 self.tap_data.get_custom_remote_path().as_ref().unwrap().display());
437 }
438 else
439 {
440 TapType::CustomLog
441 }
442 }
443 else if self.tap_data.get_custom_remote_path().is_some() == true &&
444 sock.connect(self.tap_data.get_custom_remote_path().as_ref().unwrap()).is_ok() == true
445 {
446 TapType::CustomLog
447 }
448 else if let Ok(_) = sock.connect(PATH_LOG_PRIV)
449 {
450 TapType::Priv
451 }
452 else if let Ok(_) = sock.connect(PATH_LOG)
453 {
454 TapType::UnPriv
455 }
456 else if let Ok(_) = sock.connect(PATH_OLDLOG)
457 {
458 TapType::OldLog
459 }
460 else if let Ok(_) = sock.connect(PATH_OSX)
461 {
462 TapType::Priv
463 }
464 else
465 {
466 throw_error_errno!(Errno::last(), "failed to open connection to syslog server");
468 };
469
470 #[cfg(any(
471 target_os = "freebsd",
472 target_os = "dragonfly",
473 target_os = "openbsd",
474 target_os = "netbsd",
475 target_os = "macos"
476 ))]
477 {
478 use std::os::fd::AsRawFd;
479
480 let mut len: std::mem::MaybeUninit<nix::libc::socklen_t> = std::mem::MaybeUninit::uninit();
482
483 let res =
485 unsafe
486 {
487 nix::libc::getsockopt(
488 sock.as_raw_fd(),
489 nix::libc::SOL_SOCKET,
490 nix::libc::SO_SNDBUF,
491 len.as_mut_ptr() as *mut nix::libc::c_void,
492 &mut {
493 std::mem::size_of::<nix::libc::socklen_t>() as nix::libc::socklen_t
494 }
495 )
496 };
497
498 if res == 0
499 {
500 let mut len = unsafe { len.assume_init() } as usize;
501
502 if len < MAXLINE
503 {
504 len = MAXLINE;
505
506 unsafe {
507 nix::libc::setsockopt(
508 sock.as_raw_fd(),
509 nix::libc::SOL_SOCKET,
510 nix::libc::SO_SNDBUF,
511 &len as *const _ as *const nix::libc::c_void,
512 std::mem::size_of::<nix::libc::socklen_t>() as nix::libc::socklen_t
513 )
514 };
515 }
516 }
517 }
518
519 self.sock = Some(sock);
520 self.cur_tap_type = tap_type;
521
522 return Ok(());
523 }
524
525 async
526 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
527 {
528 let sock =
529 self.sock
530 .as_mut()
531 .ok_or_else(||
532 std::io::Error::new(ErrorKind::NotConnected, "no connection")
533 )?;
534
535 return sock.send(msg).await;
536 }
537
538 async
539 fn disconnectlog(&mut self) -> std::io::Result<()>
540 {
541 match self.sock.take()
542 {
543 Some(s) =>
544 {
545 self.cur_tap_type = TapType::None;
546
547 s.shutdown(Shutdown::Both)
548 },
549 None =>
550 {
551 self.cur_tap_type = TapType::None;
552
553 Ok(())
554 }
555 }
556 }
557
558 fn is_connected(&self) -> bool
559 {
560 return self.sock.is_some();
561 }
562
563 fn get_type(&self) -> TapType
564 {
565 return self.cur_tap_type;
566 }
567
568 fn get_max_msg_size(&self) -> usize
569 {
570 return self.tap_data.get_max_msg_len();
571 }
572
573 fn update_tap_data(&mut self, tap_data: SyslogLocal)
574 {
575 self.tap_data = tap_data;
576 }
577}
578
579
580
581#[cfg(feature = "build_ext_file")]
582mod with_file
583{
584 use std::io::ErrorKind;
585
586 use tokio::fs::File;
587
588 use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error_os, SyslogDestMsg, SyslogFile, TapType};
589
590 impl AsyncSyslogTap<SyslogFile> for AsyncTap<File, SyslogFile>
591 {
592 fn new(req_tap: SyslogFile) -> SyRes<Self>
593 {
594 let ret =
595 Self
596 {
597 sock: None,
598 tap_data: req_tap,
599 cur_tap_type: TapType::LocalFile,
600 };
601
602 return Ok(ret);
603 }
604
605 async
606 fn connectlog(&mut self) -> SyRes<()>
607 {
608
609 let file =
610 File::options()
611 .append(true)
612 .read(true)
613 .write(true)
614 .create(true)
615 .open(self.tap_data.get_path())
616 .await
617 .map_err(|e|
618 map_error_os!(e, "can not open file '{}' to write, error: '{}'",
619 self.tap_data.get_path().display(), e)
620 )?;
621
622 self.sock = Some(file);
623
624 return Ok(());
625 }
626
627 async
628 fn send(&mut self, msg: &[u8]) -> std::io::Result<usize>
629 {
630 use crate::tokio::io::AsyncWriteExt;
631
632 let sock =
633 self
634 .sock
635 .as_mut()
636 .ok_or_else(||
637 std::io::Error::new(ErrorKind::NotConnected, "no connection")
638 )?;
639
640 sock.write_all(msg).await?;
641
642 return Ok(msg.len());
643 }
644
645 async
646 fn disconnectlog(&mut self) -> std::io::Result<()>
647 {
648 match self.sock.take()
649 {
650 Some(s) =>
651 {
652 self.cur_tap_type = TapType::None;
653
654 s.sync_data().await?;
655
656 drop(s);
657
658 return Ok(());
659 },
660 None =>
661 {
662 self.cur_tap_type = TapType::None;
663
664 return Ok(())
665 }
666 }
667 }
668
669 fn is_connected(&self) -> bool
670 {
671 return self.sock.is_some();
672 }
673
674 fn get_type(&self) -> TapType
675 {
676 return self.cur_tap_type;
677 }
678
679 fn get_max_msg_size(&self) -> usize
680 {
681 return self.tap_data.get_max_msg_len();
682 }
683
684 fn update_tap_data(&mut self, tap_data: SyslogFile)
685 {
686 self.tap_data = tap_data;
687 }
688 }
689}
690