Skip to main content

syslog_rs/a_sync/async_tokio/
async_socket.rs

1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 * 
4 * Copyright 2025 Aleksandr Morozov
5 * 
6 * The syslog-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *
11 *   2. The MIT License (MIT)
12 *                     
13 *   3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
14 */
15
16
17
18
19#[cfg(feature = "build_ext_tls")]
20mod with_tls
21{
22    use std::io::ErrorKind;
23
24    use tokio::{io::AsyncWriteExt, net::{TcpSocket, TcpStream}, time};
25    use tokio_rustls::{TlsConnector};
26    use tokio_rustls::client::TlsStream;
27
28    use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error, map_error_os, SyslogDestMsg, SyslogTls, TapType};
29
30    impl AsyncSyslogTap<SyslogTls> for AsyncTap<TlsStream<TcpStream>, SyslogTls>
31    {
32        fn new(req_tap: SyslogTls) -> SyRes<Self>
33        {
34            let ret = 
35                Self
36                {
37                    sock: None, 
38                    tap_data: req_tap,
39                    cur_tap_type: TapType::NetTcp,
40                };
41
42            return Ok(ret);
43        }
44
45        async 
46        fn connectlog(&mut self) -> SyRes<()> 
47        {
48
49            let tcp_socket = 
50                if self.tap_data.get_remote_addr().is_ipv4() == true
51                {
52                    TcpSocket::new_v4()
53                }
54                else
55                {
56                    TcpSocket::new_v6()
57                }
58                .map_err(|e|
59                    map_error!("cannot create tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
60                )?;
61
62            // bind
63            if self.tap_data.get_bind_addr().ip().is_unspecified() == false
64            {
65                tcp_socket
66                    .bind(*self.tap_data.get_bind_addr())
67                    .map_err(|e|
68                        map_error!("cannot bind tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
69                    )?;
70            }
71                
72            let socket = 
73                if let Some(c_timeout) = self.tap_data.get_get_conn_timeout()
74                {
75                    time::timeout(c_timeout, tcp_socket.connect(*self.tap_data.get_remote_addr()))
76                        .await
77                        .map_err(|e|
78                            map_error!("can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
79                        )?
80                }
81                else
82                {
83                    tcp_socket
84                        .connect(*self.tap_data.get_remote_addr())
85                        .await
86                }
87                .map_err(|e|
88                    map_error_os!(e, "cannot connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
89                )?;
90
91            let connector = TlsConnector::from(self.tap_data.get_client_config());
92
93            let stream = 
94                connector
95                    .connect(self.tap_data.get_serv_name(), socket)
96                    .await
97                    .map_err(|e|
98                        map_error!("Remote server: '{}', TLS connector error: '{}'", self.tap_data.get_remote_addr(), e)
99                    )?;
100
101            // set socket
102            self.sock = Some(stream);
103
104            return Ok(());
105        }
106
107        #[inline]
108        async 
109        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
110        {
111            let sock = 
112                self.sock
113                    .as_mut()
114                    .ok_or_else(||
115                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
116                    )?;
117
118            sock.write_all(msg).await?;
119            sock.flush().await?;
120
121            return Ok(msg.len());
122        }
123
124        async 
125        fn disconnectlog(&mut self) -> std::io::Result<()> 
126        {
127            match self.sock.take()
128            {
129                Some(mut s) => 
130                {
131                    s.shutdown().await?;
132
133                    drop(s);
134
135                    Ok(())
136                },
137                None =>
138                {
139                    Ok(())
140                }
141            }
142        }
143
144        #[inline]
145        fn is_connected(&self) -> bool 
146        {
147            return self.sock.is_some();
148        }
149
150        #[inline]
151        fn get_type(&self) -> TapType 
152        {
153            return self.cur_tap_type;
154        }
155
156        #[inline]
157        fn get_max_msg_size() -> usize
158        {
159            return crate::SyslogTls::get_max_msg_len();
160        }
161
162        #[inline]
163        fn update_tap_data(&mut self, tap_data: SyslogTls)
164        {
165            self.tap_data = tap_data;
166        }
167    }
168}
169
170
171#[cfg(feature = "build_ext_net")]
172mod with_net
173{
174    use std::io::ErrorKind;
175
176    use tokio::{io::AsyncWriteExt, net::{TcpSocket, TcpStream, UdpSocket}, time};
177
178    use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error, map_error_os, SyslogDestMsg, SyslogNetTcp, SyslogNetUdp, TapType};
179
180    impl AsyncSyslogTap<SyslogNetTcp> for AsyncTap<TcpStream, SyslogNetTcp>
181    {
182        fn new(req_tap: SyslogNetTcp) -> SyRes<Self>
183        {
184            let ret = 
185                Self
186                {
187                    sock: None, 
188                    tap_data: req_tap,
189                    cur_tap_type: TapType::NetTcp,
190                };
191
192            return Ok(ret);
193        }
194
195        async 
196        fn connectlog(&mut self) -> SyRes<()> 
197        {
198            let tcp_socket = 
199                if self.tap_data.get_remote_addr().is_ipv4() == true
200                {
201                    TcpSocket::new_v4()
202                }
203                else
204                {
205                    TcpSocket::new_v6()
206                }
207                .map_err(|e|
208                    map_error!("cannot create tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
209                )?;
210
211            // bind
212            if self.tap_data.get_bind_addr().ip().is_unspecified() == false
213            {
214                tcp_socket
215                    .bind(*self.tap_data.get_bind_addr())
216                    .map_err(|e|
217                        map_error!("cannot bind tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
218                    )?;
219            }
220                
221            let socket = 
222                if let Some(c_timeout) = self.tap_data.get_conn_timeout()
223                {
224                    time::timeout(c_timeout, tcp_socket.connect(*self.tap_data.get_remote_addr()))
225                        .await
226                        .map_err(|e|
227                            map_error!("can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
228                        )?
229                }
230                else
231                {
232                    tcp_socket
233                        .connect(*self.tap_data.get_remote_addr())
234                        .await
235                }
236                .map_err(|e|
237                    map_error_os!(e, "can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
238                )?;
239
240
241            // set socket
242            self.sock = Some(socket);
243
244            return Ok(());
245        }
246
247        #[inline]
248        async 
249        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
250        {
251            let sock = 
252                self.sock
253                    .as_mut()
254                    .ok_or_else(||
255                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
256                    )?;
257
258            sock.write_all(msg).await?;
259            sock.flush().await?;
260
261            return Ok(msg.len());
262        }
263
264        async 
265        fn disconnectlog(&mut self) -> std::io::Result<()> 
266        {
267            match self.sock.take()
268            {
269                Some(mut s) => 
270                {
271                    s.shutdown().await?;
272
273                    drop(s);
274
275                    Ok(())
276                },
277                None =>
278                {
279                    Ok(())
280                }
281            }
282        }
283
284        #[inline]
285        fn is_connected(&self) -> bool 
286        {
287            return self.sock.is_some();
288        }
289
290        #[inline]
291        fn get_type(&self) -> TapType 
292        {
293            return self.cur_tap_type;
294        }
295
296        #[inline]
297        fn get_max_msg_size() -> usize
298        {
299            return crate::SyslogNetTcp::get_max_msg_len();
300        }
301
302        #[inline]
303        fn update_tap_data(&mut self, tap_data: SyslogNetTcp)
304        {
305            self.tap_data = tap_data;
306        }
307    }
308
309    impl AsyncSyslogTap<SyslogNetUdp> for AsyncTap<UdpSocket, SyslogNetUdp>
310    {
311        fn new(req_tap: SyslogNetUdp) -> SyRes<Self>
312        {
313            let ret = 
314                Self
315                {
316                    sock: None, 
317                    tap_data: req_tap,
318                    cur_tap_type: TapType::NetUdp,
319                };
320
321            return Ok(ret);
322        }
323
324        async 
325        fn connectlog(&mut self) -> SyRes<()> 
326        {
327            let socket = 
328                UdpSocket
329                    ::bind(self.tap_data.get_bind_addr())
330                        .await
331                        .map_err(|e|
332                            map_error_os!(e, "can not bind udp '{}', error: {}", self.tap_data.get_bind_addr(), e)
333                        )?;
334
335            socket
336                .connect(self.tap_data.get_remote_addr())
337                .await
338                .map_err(|e|
339                    map_error_os!(e, "can not connect to udp '{}', error: {}", self.tap_data.get_remote_addr(), e)
340                )?;
341
342            // set socket
343            self.sock = Some(socket);
344
345            return Ok(());
346        }
347
348        #[inline]
349        async 
350        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
351        {
352            let sock = 
353                self
354                    .sock
355                    .as_mut()
356                    .ok_or_else(||
357                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
358                    )?;
359
360            return sock.send(msg).await;
361        }
362
363        async
364        fn disconnectlog(&mut self) -> std::io::Result<()> 
365        {
366            match self.sock.take()
367            {
368                Some(s) => 
369                {
370                    drop(s);
371
372                    Ok(())
373                },
374                None =>
375                {
376                    Ok(())
377                }
378            }
379        }
380
381        #[inline]
382        fn is_connected(&self) -> bool 
383        {
384            return self.sock.is_some();
385        }
386
387        #[inline]
388        fn get_type(&self) -> TapType 
389        {
390            return self.cur_tap_type;
391        }
392
393        #[inline]
394        fn get_max_msg_size() -> usize
395        {
396            return crate::SyslogNetUdp::get_max_msg_len();
397        }
398
399        #[inline]
400        fn update_tap_data(&mut self, tap_data: SyslogNetUdp)
401        {
402            self.tap_data = tap_data;
403        }
404    }
405}
406
407
408#[cfg(target_family = "unix")]
409mod with_unix_syslog
410{
411    use std::io::ErrorKind;
412    use std::net::Shutdown;
413
414    use tokio::net::{UnixDatagram};
415    use nix::errno::Errno;
416
417    use crate::a_sync::{AsyncSyslogTap, AsyncTap};
418    use crate::{SyslogDestMsg, SyslogLocal, TapType, map_error_os, throw_error_errno, throw_error_os};
419    use crate::common::*;
420    use crate::error::SyRes;
421
422
423    impl AsyncSyslogTap<SyslogLocal> for AsyncTap<UnixDatagram, SyslogLocal>
424    {
425
426        fn new(req_tap: SyslogLocal) -> SyRes<Self>
427        {
428            return Ok(
429                Self
430                {
431                    sock: None, 
432                    tap_data: req_tap,
433                    cur_tap_type: TapType::None,
434                }
435            );
436        }
437
438
439        async 
440        fn connectlog(&mut self) -> SyRes<()> 
441        {
442            let sock = 
443                UnixDatagram
444                    ::unbound()
445                        .map_err(|e|
446                            map_error_os!(e, "unbounded unix datagram initialization failure: {}", e)
447                        )?;
448
449            let tap_type = 
450                if self.tap_data.get_use_alternative() == false && self.tap_data.get_custom_remote_path().is_some() == true 
451                {
452                    if let Err(e) = sock.connect(self.tap_data.get_custom_remote_path().as_ref().unwrap())
453                    {
454                        throw_error_os!(e, "failed to open connection to syslog server at '{}'", 
455                            self.tap_data.get_custom_remote_path().as_ref().unwrap().display());
456                    }
457                    else 
458                    {
459                        TapType::CustomLog
460                    }
461                }
462                else if self.tap_data.get_custom_remote_path().is_some() == true &&
463                    sock.connect(self.tap_data.get_custom_remote_path().as_ref().unwrap()).is_ok() == true
464                {
465                    TapType::CustomLog
466                }
467                else if let Ok(_) = sock.connect(PATH_LOG_PRIV)
468                {
469                    TapType::Priv
470                }
471                else if let Ok(_) = sock.connect(PATH_LOG)
472                {
473                    TapType::UnPriv
474                }
475                else if let Ok(_) = sock.connect(PATH_OLDLOG)
476                {
477                    TapType::OldLog
478                }
479                else if let Ok(_) = sock.connect(PATH_OSX)
480                {
481                    TapType::Priv
482                }
483                else
484                {
485                    // failed to open socket
486                    throw_error_errno!(Errno::last(), "failed to open connection to syslog server");
487                };
488
489            #[cfg(any(
490                target_os = "freebsd",
491                target_os = "dragonfly",
492                target_os = "openbsd",
493                target_os = "netbsd",
494                target_os = "macos"
495            ))]
496            {
497                use std::os::fd::AsRawFd;
498
499                // setting sendbuf
500                let mut len: std::mem::MaybeUninit<nix::libc::socklen_t> = std::mem::MaybeUninit::uninit();
501
502                // set the sndbuf len
503                let res = 
504                    unsafe
505                    {
506                        nix::libc::getsockopt(
507                            sock.as_raw_fd(), 
508                            nix::libc::SOL_SOCKET, 
509                            nix::libc::SO_SNDBUF, 
510                            len.as_mut_ptr() as *mut nix::libc::c_void, 
511                            &mut { 
512                                std::mem::size_of::<nix::libc::socklen_t>() as nix::libc::socklen_t
513                            } 
514                        )
515                    };
516        
517                if res == 0
518                {
519                    let mut len = unsafe { len.assume_init() } as usize;
520
521                    if len < MAXLINE
522                    {
523                        len = MAXLINE;
524
525                        unsafe {
526                            nix::libc::setsockopt(
527                                sock.as_raw_fd(), 
528                                nix::libc::SOL_SOCKET, 
529                                nix::libc::SO_SNDBUF, 
530                                &len as *const _ as *const nix::libc::c_void, 
531                                std::mem::size_of::<nix::libc::socklen_t>() as nix::libc::socklen_t
532                            )
533                        };
534                    }
535                }
536            }
537
538            sock
539                .shutdown(std::net::Shutdown::Read)
540                .map_err(|e|
541                    map_error_os!(e, "can not shutdown read portion, error: '{}'", e)
542                )?;
543
544            self.sock = Some(sock);
545            self.cur_tap_type = tap_type;
546
547            return Ok(());
548        }
549
550        #[inline]
551        async 
552        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
553        {
554            let sock = 
555                self.sock
556                    .as_mut()
557                    .ok_or_else(||
558                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
559                    )?;
560
561            return sock.send(msg).await;
562        }
563
564        async 
565        fn disconnectlog(&mut self) -> std::io::Result<()>
566        {
567            match self.sock.take()
568            {
569                Some(s) => 
570                {
571                    self.cur_tap_type = TapType::None;
572
573                    s.shutdown(Shutdown::Both)
574                },
575                None =>
576                {
577                    self.cur_tap_type = TapType::None;
578
579                    Ok(())
580                }
581            }
582        }
583
584        #[inline]
585        fn is_connected(&self) -> bool
586        {
587            return self.sock.is_some();
588        }
589
590        #[inline]
591        fn get_type(&self) -> TapType 
592        {
593            return self.cur_tap_type;
594        }
595
596        #[inline]
597        fn get_max_msg_size() -> usize
598        {
599            return crate::SyslogLocal::get_max_msg_len();
600        }
601
602        #[inline]
603        fn update_tap_data(&mut self, tap_data: SyslogLocal)
604        {
605            self.tap_data = tap_data;
606        }
607    }
608}
609
610
611
612#[cfg(feature = "build_ext_file")]
613mod with_file
614{
615    use std::io::ErrorKind;
616
617    use tokio::fs::File;
618
619    use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes,  map_error_os, SyslogDestMsg, SyslogFile, TapType};
620
621    impl AsyncSyslogTap<SyslogFile> for AsyncTap<File, SyslogFile>
622    {
623        fn new(req_tap: SyslogFile) -> SyRes<Self>
624        {
625            let ret = 
626                Self
627                {
628                    sock: None, 
629                    tap_data: req_tap,
630                    cur_tap_type: TapType::LocalFile,
631                };
632
633            return Ok(ret);
634        }
635
636        async 
637        fn connectlog(&mut self) -> SyRes<()> 
638        {
639
640            let file = 
641                File::options()
642                    .append(true)
643                    .read(true)
644                    .write(true)
645                    .create(true)
646                    .open(self.tap_data.get_path())
647                    .await
648                    .map_err(|e|
649                        map_error_os!(e, "can not open file '{}' to write, error: '{}'", 
650                            self.tap_data.get_path().display(), e)
651                    )?;
652
653            self.sock = Some(file);
654
655            return Ok(());
656        }
657
658        #[inline]
659        async 
660        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
661        {
662            use crate::tokio::io::AsyncWriteExt;
663            
664            let sock = 
665                self
666                    .sock
667                    .as_mut()
668                    .ok_or_else(||
669                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
670                    )?;
671            
672            sock.write_all(msg).await?;
673
674            return Ok(msg.len());
675        }
676
677        async 
678        fn disconnectlog(&mut self) -> std::io::Result<()> 
679        {
680            match self.sock.take()
681            {
682                Some(s) => 
683                {
684                    self.cur_tap_type = TapType::None;
685
686                    s.sync_data().await?;
687
688                    drop(s);
689
690                    return Ok(());
691                },
692                None =>
693                {
694                    self.cur_tap_type = TapType::None;
695
696                    return Ok(())
697                }
698            }
699        }
700
701        #[inline]
702        fn is_connected(&self) -> bool 
703        {
704            return self.sock.is_some();
705        }
706
707        #[inline]
708        fn get_type(&self) -> TapType 
709        {
710            return self.cur_tap_type;
711        }
712
713        #[inline]
714        fn get_max_msg_size() -> usize
715        {
716            return crate::SyslogFile::get_max_msg_len();
717        }
718
719        #[inline]
720        fn update_tap_data(&mut self, tap_data: SyslogFile)
721        {
722            self.tap_data = tap_data;
723        }
724    }
725}
726
727
728// --- windows ---
729
730#[cfg(target_family = "windows")]
731mod with_windows_eventlog
732{
733    use std::{fmt};
734    use std::io::ErrorKind;
735
736    use crate::portable::{EventLogLocal};
737    use crate::{error::SyRes, TapType};
738
739
740    use crate::
741    {
742        AsyncSyslogTap, AsyncTap, SyslogDestMsg, WindowsEvent, map_error_os
743    };
744
745    use crate::common::*;
746
747    impl AsyncSyslogTap<WindowsEvent> for AsyncTap<EventLogLocal, WindowsEvent>
748    {
749        /// Creates the instance for local socket.
750        /// 
751        /// If `opt_path` does not exist or can not be opened, in case if 
752        /// flag `use_altern` is set to true, other known localions will 
753        /// be attempted to connect before giving up.
754        /// 
755        /// # Arguments
756        /// 
757        /// * `req_tap` - a type of the tap which is required. See [TapTypeData] 
758        ///     The actual type of the tap selected will be stored in the field 
759        ///     `cur_tap_type`.
760        fn new(req_tap: WindowsEvent) -> SyRes<Self>
761        {
762            return Ok(
763                Self
764                {
765                    sock: None, 
766                    tap_data: req_tap,
767                    cur_tap_type: TapType::None,
768                }
769            );
770        }
771
772        async 
773        fn connectlog(&mut self) -> SyRes<()> 
774        {
775            let sock = 
776                EventLogLocal::new(&self.tap_data)
777                    .map_err(|e|
778                        map_error_os!(e, "EventRegister error: '{}'", e)
779                    )?;
780
781            self.sock = Some(sock);
782            self.cur_tap_type = TapType::WindowsEventLog;
783
784            return Ok(());
785        }
786
787        #[inline]
788        async 
789        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
790        {
791            let sock = 
792                self
793                    .sock
794                    .as_ref()
795                    .ok_or_else(||
796                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
797                    )?;
798
799            return sock.send(msg);
800        }
801
802        async 
803        fn disconnectlog(&mut self) -> std::io::Result<()> 
804        {
805            match self.sock.take()
806            {
807                Some(s) => 
808                {
809                    self.cur_tap_type = TapType::None;
810
811                    drop(s);
812
813                    Ok(())
814                },
815                None =>
816                {
817                    self.cur_tap_type = TapType::None;
818
819                    Ok(())
820                }
821            }
822        }
823
824        #[inline]
825        fn is_connected(&self) -> bool
826        {
827            return self.sock.is_some();
828        }
829
830        #[inline]
831        fn get_type(&self) -> TapType 
832        {
833            return self.cur_tap_type;
834        }
835
836        #[inline]
837        fn get_max_msg_size() -> usize
838        {
839            return WindowsEvent::get_max_msg_len();
840        }
841
842        #[inline]
843        fn update_tap_data(&mut self, tap_data: WindowsEvent)
844        {
845            self.tap_data = tap_data;
846        }
847    }
848}
849