syslog_rs/a_sync/async_tokio/
async_socket.rs

1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 * 
4 * Copyright 2025 Aleksandr Morozov
5 * 
6 * The syslog-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *                     
11 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
12 */
13
14
15use std::io::ErrorKind;
16use std::net::Shutdown;
17
18use tokio::net::{UnixDatagram};
19use nix::errno::Errno;
20
21use crate::a_sync::{AsyncSyslogTap, AsyncTap};
22
23
24
25use crate::{map_error_os, throw_error_errno, throw_error_os, SyslogDestMsg, SyslogLocal, TapType};
26
27use crate::common::*;
28use crate::error::SyRes;
29
30
31#[cfg(feature = "build_ext_tls")]
32mod with_tls
33{
34    use std::io::ErrorKind;
35
36    use tokio::{io::AsyncWriteExt, net::{TcpSocket, TcpStream}, time};
37    use tokio_rustls::{TlsConnector};
38    use tokio_rustls::client::TlsStream;
39
40    use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error, map_error_os, SyslogDestMsg, SyslogTls, TapType};
41
42    impl AsyncSyslogTap<SyslogTls> for AsyncTap<TlsStream<TcpStream>, SyslogTls>
43    {
44        fn new(req_tap: SyslogTls) -> SyRes<Self>
45        {
46            let ret = 
47                Self
48                {
49                    sock: None, 
50                    tap_data: req_tap,
51                    cur_tap_type: TapType::NetTcp,
52                };
53
54            return Ok(ret);
55        }
56
57        async 
58        fn connectlog(&mut self) -> SyRes<()> 
59        {
60
61            let tcp_socket = 
62                if self.tap_data.get_remote_addr().is_ipv4() == true
63                {
64                    TcpSocket::new_v4()
65                }
66                else
67                {
68                    TcpSocket::new_v6()
69                }
70                .map_err(|e|
71                    map_error!("cannot create tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
72                )?;
73
74            // bind
75            if self.tap_data.get_bind_addr().ip().is_unspecified() == false
76            {
77                tcp_socket
78                    .bind(*self.tap_data.get_bind_addr())
79                    .map_err(|e|
80                        map_error!("cannot bind tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
81                    )?;
82            }
83                
84            let socket = 
85                if let Some(c_timeout) = self.tap_data.get_get_conn_timeout()
86                {
87                    time::timeout(c_timeout, tcp_socket.connect(*self.tap_data.get_remote_addr()))
88                        .await
89                        .map_err(|e|
90                            map_error!("can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
91                        )?
92                }
93                else
94                {
95                    tcp_socket
96                        .connect(*self.tap_data.get_remote_addr())
97                        .await
98                }
99                .map_err(|e|
100                    map_error_os!(e, "cannot connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
101                )?;
102
103            let connector = TlsConnector::from(self.tap_data.get_client_config());
104
105            let stream = 
106                connector
107                    .connect(self.tap_data.get_serv_name(), socket)
108                    .await
109                    .map_err(|e|
110                        map_error!("Remote server: '{}', TLS connector error: '{}'", self.tap_data.get_remote_addr(), e)
111                    )?;
112
113            // set socket
114            self.sock = Some(stream);
115
116            return Ok(());
117        }
118
119        #[inline]
120        async 
121        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
122        {
123            let sock = 
124                self.sock
125                    .as_mut()
126                    .ok_or_else(||
127                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
128                    )?;
129
130            sock.write_all(msg).await?;
131            sock.flush().await?;
132
133            return Ok(msg.len());
134        }
135
136        async 
137        fn disconnectlog(&mut self) -> std::io::Result<()> 
138        {
139            match self.sock.take()
140            {
141                Some(mut s) => 
142                {
143                    s.shutdown().await?;
144
145                    drop(s);
146
147                    Ok(())
148                },
149                None =>
150                {
151                    Ok(())
152                }
153            }
154        }
155
156        #[inline]
157        fn is_connected(&self) -> bool 
158        {
159            return self.sock.is_some();
160        }
161
162        #[inline]
163        fn get_type(&self) -> TapType 
164        {
165            return self.cur_tap_type;
166        }
167
168        #[inline]
169        fn get_max_msg_size() -> usize
170        {
171            return crate::SyslogTls::get_max_msg_len();
172        }
173
174        #[inline]
175        fn update_tap_data(&mut self, tap_data: SyslogTls)
176        {
177            self.tap_data = tap_data;
178        }
179    }
180}
181
182
183#[cfg(feature = "build_ext_net")]
184mod with_net
185{
186    use std::io::ErrorKind;
187
188    use tokio::{io::AsyncWriteExt, net::{TcpSocket, TcpStream, UdpSocket}, time};
189
190    use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error, map_error_os, SyslogDestMsg, SyslogNetTcp, SyslogNetUdp, TapType};
191
192    impl AsyncSyslogTap<SyslogNetTcp> for AsyncTap<TcpStream, SyslogNetTcp>
193    {
194        fn new(req_tap: SyslogNetTcp) -> SyRes<Self>
195        {
196            let ret = 
197                Self
198                {
199                    sock: None, 
200                    tap_data: req_tap,
201                    cur_tap_type: TapType::NetTcp,
202                };
203
204            return Ok(ret);
205        }
206
207        async 
208        fn connectlog(&mut self) -> SyRes<()> 
209        {
210            let tcp_socket = 
211                if self.tap_data.get_remote_addr().is_ipv4() == true
212                {
213                    TcpSocket::new_v4()
214                }
215                else
216                {
217                    TcpSocket::new_v6()
218                }
219                .map_err(|e|
220                    map_error!("cannot create tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
221                )?;
222
223            // bind
224            if self.tap_data.get_bind_addr().ip().is_unspecified() == false
225            {
226                tcp_socket
227                    .bind(*self.tap_data.get_bind_addr())
228                    .map_err(|e|
229                        map_error!("cannot bind tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
230                    )?;
231            }
232                
233            let socket = 
234                if let Some(c_timeout) = self.tap_data.get_conn_timeout()
235                {
236                    time::timeout(c_timeout, tcp_socket.connect(*self.tap_data.get_remote_addr()))
237                        .await
238                        .map_err(|e|
239                            map_error!("can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
240                        )?
241                }
242                else
243                {
244                    tcp_socket
245                        .connect(*self.tap_data.get_remote_addr())
246                        .await
247                }
248                .map_err(|e|
249                    map_error_os!(e, "can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
250                )?;
251
252
253            // set socket
254            self.sock = Some(socket);
255
256            return Ok(());
257        }
258
259        #[inline]
260        async 
261        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
262        {
263            let sock = 
264                self.sock
265                    .as_mut()
266                    .ok_or_else(||
267                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
268                    )?;
269
270            sock.write_all(msg).await?;
271            sock.flush().await?;
272
273            return Ok(msg.len());
274        }
275
276        async 
277        fn disconnectlog(&mut self) -> std::io::Result<()> 
278        {
279            match self.sock.take()
280            {
281                Some(mut s) => 
282                {
283                    s.shutdown().await?;
284
285                    drop(s);
286
287                    Ok(())
288                },
289                None =>
290                {
291                    Ok(())
292                }
293            }
294        }
295
296        #[inline]
297        fn is_connected(&self) -> bool 
298        {
299            return self.sock.is_some();
300        }
301
302        #[inline]
303        fn get_type(&self) -> TapType 
304        {
305            return self.cur_tap_type;
306        }
307
308        #[inline]
309        fn get_max_msg_size() -> usize
310        {
311            return crate::SyslogNetTcp::get_max_msg_len();
312        }
313
314        #[inline]
315        fn update_tap_data(&mut self, tap_data: SyslogNetTcp)
316        {
317            self.tap_data = tap_data;
318        }
319    }
320
321    impl AsyncSyslogTap<SyslogNetUdp> for AsyncTap<UdpSocket, SyslogNetUdp>
322    {
323        fn new(req_tap: SyslogNetUdp) -> SyRes<Self>
324        {
325            let ret = 
326                Self
327                {
328                    sock: None, 
329                    tap_data: req_tap,
330                    cur_tap_type: TapType::NetUdp,
331                };
332
333            return Ok(ret);
334        }
335
336        async 
337        fn connectlog(&mut self) -> SyRes<()> 
338        {
339            let socket = 
340                UdpSocket
341                    ::bind(self.tap_data.get_bind_addr())
342                        .await
343                        .map_err(|e|
344                            map_error_os!(e, "can not bind udp '{}', error: {}", self.tap_data.get_bind_addr(), e)
345                        )?;
346
347            socket
348                .connect(self.tap_data.get_remote_addr())
349                .await
350                .map_err(|e|
351                    map_error_os!(e, "can not connect to udp '{}', error: {}", self.tap_data.get_remote_addr(), e)
352                )?;
353
354            // set socket
355            self.sock = Some(socket);
356
357            return Ok(());
358        }
359
360        #[inline]
361        async 
362        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
363        {
364            let sock = 
365                self
366                    .sock
367                    .as_mut()
368                    .ok_or_else(||
369                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
370                    )?;
371
372            return sock.send(msg).await;
373        }
374
375        async
376        fn disconnectlog(&mut self) -> std::io::Result<()> 
377        {
378            match self.sock.take()
379            {
380                Some(s) => 
381                {
382                    drop(s);
383
384                    Ok(())
385                },
386                None =>
387                {
388                    Ok(())
389                }
390            }
391        }
392
393        #[inline]
394        fn is_connected(&self) -> bool 
395        {
396            return self.sock.is_some();
397        }
398
399        #[inline]
400        fn get_type(&self) -> TapType 
401        {
402            return self.cur_tap_type;
403        }
404
405        #[inline]
406        fn get_max_msg_size() -> usize
407        {
408            return crate::SyslogNetUdp::get_max_msg_len();
409        }
410
411        #[inline]
412        fn update_tap_data(&mut self, tap_data: SyslogNetUdp)
413        {
414            self.tap_data = tap_data;
415        }
416    }
417}
418
419
420impl AsyncSyslogTap<SyslogLocal> for AsyncTap<UnixDatagram, SyslogLocal>
421{
422
423    fn new(req_tap: SyslogLocal) -> SyRes<Self>
424    {
425        return Ok(
426            Self
427            {
428                sock: None, 
429                tap_data: req_tap,
430                cur_tap_type: TapType::None,
431            }
432        );
433    }
434
435
436    async 
437    fn connectlog(&mut self) -> SyRes<()> 
438    {
439        let sock = 
440            UnixDatagram
441                ::unbound()
442                    .map_err(|e|
443                        map_error_os!(e, "unbounded unix datagram initialization failure: {}", e)
444                    )?;
445
446        let tap_type = 
447            if self.tap_data.get_use_alternative() == false && self.tap_data.get_custom_remote_path().is_some() == true 
448            {
449                if let Err(e) = sock.connect(self.tap_data.get_custom_remote_path().as_ref().unwrap())
450                {
451                    throw_error_os!(e, "failed to open connection to syslog server at '{}'", 
452                        self.tap_data.get_custom_remote_path().as_ref().unwrap().display());
453                }
454                else 
455                {
456                    TapType::CustomLog
457                }
458            }
459            else if self.tap_data.get_custom_remote_path().is_some() == true &&
460                sock.connect(self.tap_data.get_custom_remote_path().as_ref().unwrap()).is_ok() == true
461            {
462                TapType::CustomLog
463            }
464            else if let Ok(_) = sock.connect(PATH_LOG_PRIV)
465            {
466                TapType::Priv
467            }
468            else if let Ok(_) = sock.connect(PATH_LOG)
469            {
470                TapType::UnPriv
471            }
472            else if let Ok(_) = sock.connect(PATH_OLDLOG)
473            {
474                TapType::OldLog
475            }
476            else if let Ok(_) = sock.connect(PATH_OSX)
477            {
478                TapType::Priv
479            }
480            else
481            {
482                // failed to open socket
483                throw_error_errno!(Errno::last(), "failed to open connection to syslog server");
484            };
485
486        #[cfg(any(
487            target_os = "freebsd",
488            target_os = "dragonfly",
489            target_os = "openbsd",
490            target_os = "netbsd",
491            target_os = "macos"
492        ))]
493        {
494            use std::os::fd::AsRawFd;
495
496            // setting sendbuf
497            let mut len: std::mem::MaybeUninit<nix::libc::socklen_t> = std::mem::MaybeUninit::uninit();
498
499            // set the sndbuf len
500            let res = 
501                unsafe
502                {
503                    nix::libc::getsockopt(
504                        sock.as_raw_fd(), 
505                        nix::libc::SOL_SOCKET, 
506                        nix::libc::SO_SNDBUF, 
507                        len.as_mut_ptr() as *mut nix::libc::c_void, 
508                        &mut { 
509                            std::mem::size_of::<nix::libc::socklen_t>() as nix::libc::socklen_t
510                        } 
511                    )
512                };
513    
514            if res == 0
515            {
516                let mut len = unsafe { len.assume_init() } as usize;
517
518                if len < MAXLINE
519                {
520                    len = MAXLINE;
521
522                    unsafe {
523                        nix::libc::setsockopt(
524                            sock.as_raw_fd(), 
525                            nix::libc::SOL_SOCKET, 
526                            nix::libc::SO_SNDBUF, 
527                            &len as *const _ as *const nix::libc::c_void, 
528                            std::mem::size_of::<nix::libc::socklen_t>() as nix::libc::socklen_t
529                        )
530                    };
531                }
532            }
533        }
534
535        sock
536            .shutdown(std::net::Shutdown::Read)
537            .map_err(|e|
538                map_error_os!(e, "can not shutdown read portion, error: '{}'", e)
539            )?;
540
541        self.sock = Some(sock);
542        self.cur_tap_type = tap_type;
543
544        return Ok(());
545    }
546
547    #[inline]
548    async 
549    fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
550    {
551        let sock = 
552            self.sock
553                .as_mut()
554                .ok_or_else(||
555                    std::io::Error::new(ErrorKind::NotConnected, "no connection")
556                )?;
557
558        return sock.send(msg).await;
559    }
560
561    async 
562    fn disconnectlog(&mut self) -> std::io::Result<()>
563    {
564        match self.sock.take()
565        {
566            Some(s) => 
567            {
568                self.cur_tap_type = TapType::None;
569
570                s.shutdown(Shutdown::Both)
571            },
572            None =>
573            {
574                self.cur_tap_type = TapType::None;
575
576                Ok(())
577            }
578        }
579    }
580
581    #[inline]
582    fn is_connected(&self) -> bool
583    {
584        return self.sock.is_some();
585    }
586
587    #[inline]
588    fn get_type(&self) -> TapType 
589    {
590        return self.cur_tap_type;
591    }
592
593    #[inline]
594    fn get_max_msg_size() -> usize
595    {
596        return crate::SyslogLocal::get_max_msg_len();
597    }
598
599    #[inline]
600    fn update_tap_data(&mut self, tap_data: SyslogLocal)
601    {
602        self.tap_data = tap_data;
603    }
604}
605
606
607
608#[cfg(feature = "build_ext_file")]
609mod with_file
610{
611    use std::io::ErrorKind;
612
613    use tokio::fs::File;
614
615    use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes,  map_error_os, SyslogDestMsg, SyslogFile, TapType};
616
617    impl AsyncSyslogTap<SyslogFile> for AsyncTap<File, SyslogFile>
618    {
619        fn new(req_tap: SyslogFile) -> SyRes<Self>
620        {
621            let ret = 
622                Self
623                {
624                    sock: None, 
625                    tap_data: req_tap,
626                    cur_tap_type: TapType::LocalFile,
627                };
628
629            return Ok(ret);
630        }
631
632        async 
633        fn connectlog(&mut self) -> SyRes<()> 
634        {
635
636            let file = 
637                File::options()
638                    .append(true)
639                    .read(true)
640                    .write(true)
641                    .create(true)
642                    .open(self.tap_data.get_path())
643                    .await
644                    .map_err(|e|
645                        map_error_os!(e, "can not open file '{}' to write, error: '{}'", 
646                            self.tap_data.get_path().display(), e)
647                    )?;
648
649            self.sock = Some(file);
650
651            return Ok(());
652        }
653
654        #[inline]
655        async 
656        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
657        {
658            use crate::tokio::io::AsyncWriteExt;
659            
660            let sock = 
661                self
662                    .sock
663                    .as_mut()
664                    .ok_or_else(||
665                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
666                    )?;
667            
668            sock.write_all(msg).await?;
669
670            return Ok(msg.len());
671        }
672
673        async 
674        fn disconnectlog(&mut self) -> std::io::Result<()> 
675        {
676            match self.sock.take()
677            {
678                Some(s) => 
679                {
680                    self.cur_tap_type = TapType::None;
681
682                    s.sync_data().await?;
683
684                    drop(s);
685
686                    return Ok(());
687                },
688                None =>
689                {
690                    self.cur_tap_type = TapType::None;
691
692                    return Ok(())
693                }
694            }
695        }
696
697        #[inline]
698        fn is_connected(&self) -> bool 
699        {
700            return self.sock.is_some();
701        }
702
703        #[inline]
704        fn get_type(&self) -> TapType 
705        {
706            return self.cur_tap_type;
707        }
708
709        #[inline]
710        fn get_max_msg_size() -> usize
711        {
712            return crate::SyslogFile::get_max_msg_len();
713        }
714
715        #[inline]
716        fn update_tap_data(&mut self, tap_data: SyslogFile)
717        {
718            self.tap_data = tap_data;
719        }
720    }
721}
722