syslog_rs/a_sync/async_tokio/
async_socket.rs

1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 * 
4 * Copyright 2025 Aleksandr Morozov
5 * 
6 * The syslog-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *
11 *   2. The MIT License (MIT)
12 *                     
13 *   3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
14 */
15
16
17use std::io::ErrorKind;
18use std::net::Shutdown;
19
20use tokio::net::{UnixDatagram};
21use nix::errno::Errno;
22
23use crate::a_sync::{AsyncSyslogTap, AsyncTap};
24
25
26
27use crate::{map_error_os, throw_error_errno, throw_error_os, SyslogDestMsg, SyslogLocal, TapType};
28
29use crate::common::*;
30use crate::error::SyRes;
31
32
33#[cfg(feature = "build_ext_tls")]
34mod with_tls
35{
36    use std::io::ErrorKind;
37
38    use tokio::{io::AsyncWriteExt, net::{TcpSocket, TcpStream}, time};
39    use tokio_rustls::{TlsConnector};
40    use tokio_rustls::client::TlsStream;
41
42    use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error, map_error_os, SyslogDestMsg, SyslogTls, TapType};
43
44    impl AsyncSyslogTap<SyslogTls> for AsyncTap<TlsStream<TcpStream>, SyslogTls>
45    {
46        fn new(req_tap: SyslogTls) -> SyRes<Self>
47        {
48            let ret = 
49                Self
50                {
51                    sock: None, 
52                    tap_data: req_tap,
53                    cur_tap_type: TapType::NetTcp,
54                };
55
56            return Ok(ret);
57        }
58
59        async 
60        fn connectlog(&mut self) -> SyRes<()> 
61        {
62
63            let tcp_socket = 
64                if self.tap_data.get_remote_addr().is_ipv4() == true
65                {
66                    TcpSocket::new_v4()
67                }
68                else
69                {
70                    TcpSocket::new_v6()
71                }
72                .map_err(|e|
73                    map_error!("cannot create tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
74                )?;
75
76            // bind
77            if self.tap_data.get_bind_addr().ip().is_unspecified() == false
78            {
79                tcp_socket
80                    .bind(*self.tap_data.get_bind_addr())
81                    .map_err(|e|
82                        map_error!("cannot bind tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
83                    )?;
84            }
85                
86            let socket = 
87                if let Some(c_timeout) = self.tap_data.get_get_conn_timeout()
88                {
89                    time::timeout(c_timeout, tcp_socket.connect(*self.tap_data.get_remote_addr()))
90                        .await
91                        .map_err(|e|
92                            map_error!("can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
93                        )?
94                }
95                else
96                {
97                    tcp_socket
98                        .connect(*self.tap_data.get_remote_addr())
99                        .await
100                }
101                .map_err(|e|
102                    map_error_os!(e, "cannot connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
103                )?;
104
105            let connector = TlsConnector::from(self.tap_data.get_client_config());
106
107            let stream = 
108                connector
109                    .connect(self.tap_data.get_serv_name(), socket)
110                    .await
111                    .map_err(|e|
112                        map_error!("Remote server: '{}', TLS connector error: '{}'", self.tap_data.get_remote_addr(), e)
113                    )?;
114
115            // set socket
116            self.sock = Some(stream);
117
118            return Ok(());
119        }
120
121        #[inline]
122        async 
123        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
124        {
125            let sock = 
126                self.sock
127                    .as_mut()
128                    .ok_or_else(||
129                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
130                    )?;
131
132            sock.write_all(msg).await?;
133            sock.flush().await?;
134
135            return Ok(msg.len());
136        }
137
138        async 
139        fn disconnectlog(&mut self) -> std::io::Result<()> 
140        {
141            match self.sock.take()
142            {
143                Some(mut s) => 
144                {
145                    s.shutdown().await?;
146
147                    drop(s);
148
149                    Ok(())
150                },
151                None =>
152                {
153                    Ok(())
154                }
155            }
156        }
157
158        #[inline]
159        fn is_connected(&self) -> bool 
160        {
161            return self.sock.is_some();
162        }
163
164        #[inline]
165        fn get_type(&self) -> TapType 
166        {
167            return self.cur_tap_type;
168        }
169
170        #[inline]
171        fn get_max_msg_size() -> usize
172        {
173            return crate::SyslogTls::get_max_msg_len();
174        }
175
176        #[inline]
177        fn update_tap_data(&mut self, tap_data: SyslogTls)
178        {
179            self.tap_data = tap_data;
180        }
181    }
182}
183
184
185#[cfg(feature = "build_ext_net")]
186mod with_net
187{
188    use std::io::ErrorKind;
189
190    use tokio::{io::AsyncWriteExt, net::{TcpSocket, TcpStream, UdpSocket}, time};
191
192    use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error, map_error_os, SyslogDestMsg, SyslogNetTcp, SyslogNetUdp, TapType};
193
194    impl AsyncSyslogTap<SyslogNetTcp> for AsyncTap<TcpStream, SyslogNetTcp>
195    {
196        fn new(req_tap: SyslogNetTcp) -> SyRes<Self>
197        {
198            let ret = 
199                Self
200                {
201                    sock: None, 
202                    tap_data: req_tap,
203                    cur_tap_type: TapType::NetTcp,
204                };
205
206            return Ok(ret);
207        }
208
209        async 
210        fn connectlog(&mut self) -> SyRes<()> 
211        {
212            let tcp_socket = 
213                if self.tap_data.get_remote_addr().is_ipv4() == true
214                {
215                    TcpSocket::new_v4()
216                }
217                else
218                {
219                    TcpSocket::new_v6()
220                }
221                .map_err(|e|
222                    map_error!("cannot create tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
223                )?;
224
225            // bind
226            if self.tap_data.get_bind_addr().ip().is_unspecified() == false
227            {
228                tcp_socket
229                    .bind(*self.tap_data.get_bind_addr())
230                    .map_err(|e|
231                        map_error!("cannot bind tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
232                    )?;
233            }
234                
235            let socket = 
236                if let Some(c_timeout) = self.tap_data.get_conn_timeout()
237                {
238                    time::timeout(c_timeout, tcp_socket.connect(*self.tap_data.get_remote_addr()))
239                        .await
240                        .map_err(|e|
241                            map_error!("can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
242                        )?
243                }
244                else
245                {
246                    tcp_socket
247                        .connect(*self.tap_data.get_remote_addr())
248                        .await
249                }
250                .map_err(|e|
251                    map_error_os!(e, "can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
252                )?;
253
254
255            // set socket
256            self.sock = Some(socket);
257
258            return Ok(());
259        }
260
261        #[inline]
262        async 
263        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
264        {
265            let sock = 
266                self.sock
267                    .as_mut()
268                    .ok_or_else(||
269                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
270                    )?;
271
272            sock.write_all(msg).await?;
273            sock.flush().await?;
274
275            return Ok(msg.len());
276        }
277
278        async 
279        fn disconnectlog(&mut self) -> std::io::Result<()> 
280        {
281            match self.sock.take()
282            {
283                Some(mut s) => 
284                {
285                    s.shutdown().await?;
286
287                    drop(s);
288
289                    Ok(())
290                },
291                None =>
292                {
293                    Ok(())
294                }
295            }
296        }
297
298        #[inline]
299        fn is_connected(&self) -> bool 
300        {
301            return self.sock.is_some();
302        }
303
304        #[inline]
305        fn get_type(&self) -> TapType 
306        {
307            return self.cur_tap_type;
308        }
309
310        #[inline]
311        fn get_max_msg_size() -> usize
312        {
313            return crate::SyslogNetTcp::get_max_msg_len();
314        }
315
316        #[inline]
317        fn update_tap_data(&mut self, tap_data: SyslogNetTcp)
318        {
319            self.tap_data = tap_data;
320        }
321    }
322
323    impl AsyncSyslogTap<SyslogNetUdp> for AsyncTap<UdpSocket, SyslogNetUdp>
324    {
325        fn new(req_tap: SyslogNetUdp) -> SyRes<Self>
326        {
327            let ret = 
328                Self
329                {
330                    sock: None, 
331                    tap_data: req_tap,
332                    cur_tap_type: TapType::NetUdp,
333                };
334
335            return Ok(ret);
336        }
337
338        async 
339        fn connectlog(&mut self) -> SyRes<()> 
340        {
341            let socket = 
342                UdpSocket
343                    ::bind(self.tap_data.get_bind_addr())
344                        .await
345                        .map_err(|e|
346                            map_error_os!(e, "can not bind udp '{}', error: {}", self.tap_data.get_bind_addr(), e)
347                        )?;
348
349            socket
350                .connect(self.tap_data.get_remote_addr())
351                .await
352                .map_err(|e|
353                    map_error_os!(e, "can not connect to udp '{}', error: {}", self.tap_data.get_remote_addr(), e)
354                )?;
355
356            // set socket
357            self.sock = Some(socket);
358
359            return Ok(());
360        }
361
362        #[inline]
363        async 
364        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
365        {
366            let sock = 
367                self
368                    .sock
369                    .as_mut()
370                    .ok_or_else(||
371                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
372                    )?;
373
374            return sock.send(msg).await;
375        }
376
377        async
378        fn disconnectlog(&mut self) -> std::io::Result<()> 
379        {
380            match self.sock.take()
381            {
382                Some(s) => 
383                {
384                    drop(s);
385
386                    Ok(())
387                },
388                None =>
389                {
390                    Ok(())
391                }
392            }
393        }
394
395        #[inline]
396        fn is_connected(&self) -> bool 
397        {
398            return self.sock.is_some();
399        }
400
401        #[inline]
402        fn get_type(&self) -> TapType 
403        {
404            return self.cur_tap_type;
405        }
406
407        #[inline]
408        fn get_max_msg_size() -> usize
409        {
410            return crate::SyslogNetUdp::get_max_msg_len();
411        }
412
413        #[inline]
414        fn update_tap_data(&mut self, tap_data: SyslogNetUdp)
415        {
416            self.tap_data = tap_data;
417        }
418    }
419}
420
421
422impl AsyncSyslogTap<SyslogLocal> for AsyncTap<UnixDatagram, SyslogLocal>
423{
424
425    fn new(req_tap: SyslogLocal) -> SyRes<Self>
426    {
427        return Ok(
428            Self
429            {
430                sock: None, 
431                tap_data: req_tap,
432                cur_tap_type: TapType::None,
433            }
434        );
435    }
436
437
438    async 
439    fn connectlog(&mut self) -> SyRes<()> 
440    {
441        let sock = 
442            UnixDatagram
443                ::unbound()
444                    .map_err(|e|
445                        map_error_os!(e, "unbounded unix datagram initialization failure: {}", e)
446                    )?;
447
448        let tap_type = 
449            if self.tap_data.get_use_alternative() == false && self.tap_data.get_custom_remote_path().is_some() == true 
450            {
451                if let Err(e) = sock.connect(self.tap_data.get_custom_remote_path().as_ref().unwrap())
452                {
453                    throw_error_os!(e, "failed to open connection to syslog server at '{}'", 
454                        self.tap_data.get_custom_remote_path().as_ref().unwrap().display());
455                }
456                else 
457                {
458                    TapType::CustomLog
459                }
460            }
461            else if self.tap_data.get_custom_remote_path().is_some() == true &&
462                sock.connect(self.tap_data.get_custom_remote_path().as_ref().unwrap()).is_ok() == true
463            {
464                TapType::CustomLog
465            }
466            else if let Ok(_) = sock.connect(PATH_LOG_PRIV)
467            {
468                TapType::Priv
469            }
470            else if let Ok(_) = sock.connect(PATH_LOG)
471            {
472                TapType::UnPriv
473            }
474            else if let Ok(_) = sock.connect(PATH_OLDLOG)
475            {
476                TapType::OldLog
477            }
478            else if let Ok(_) = sock.connect(PATH_OSX)
479            {
480                TapType::Priv
481            }
482            else
483            {
484                // failed to open socket
485                throw_error_errno!(Errno::last(), "failed to open connection to syslog server");
486            };
487
488        #[cfg(any(
489            target_os = "freebsd",
490            target_os = "dragonfly",
491            target_os = "openbsd",
492            target_os = "netbsd",
493            target_os = "macos"
494        ))]
495        {
496            use std::os::fd::AsRawFd;
497
498            // setting sendbuf
499            let mut len: std::mem::MaybeUninit<nix::libc::socklen_t> = std::mem::MaybeUninit::uninit();
500
501            // set the sndbuf len
502            let res = 
503                unsafe
504                {
505                    nix::libc::getsockopt(
506                        sock.as_raw_fd(), 
507                        nix::libc::SOL_SOCKET, 
508                        nix::libc::SO_SNDBUF, 
509                        len.as_mut_ptr() as *mut nix::libc::c_void, 
510                        &mut { 
511                            std::mem::size_of::<nix::libc::socklen_t>() as nix::libc::socklen_t
512                        } 
513                    )
514                };
515    
516            if res == 0
517            {
518                let mut len = unsafe { len.assume_init() } as usize;
519
520                if len < MAXLINE
521                {
522                    len = MAXLINE;
523
524                    unsafe {
525                        nix::libc::setsockopt(
526                            sock.as_raw_fd(), 
527                            nix::libc::SOL_SOCKET, 
528                            nix::libc::SO_SNDBUF, 
529                            &len as *const _ as *const nix::libc::c_void, 
530                            std::mem::size_of::<nix::libc::socklen_t>() as nix::libc::socklen_t
531                        )
532                    };
533                }
534            }
535        }
536
537        sock
538            .shutdown(std::net::Shutdown::Read)
539            .map_err(|e|
540                map_error_os!(e, "can not shutdown read portion, error: '{}'", e)
541            )?;
542
543        self.sock = Some(sock);
544        self.cur_tap_type = tap_type;
545
546        return Ok(());
547    }
548
549    #[inline]
550    async 
551    fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
552    {
553        let sock = 
554            self.sock
555                .as_mut()
556                .ok_or_else(||
557                    std::io::Error::new(ErrorKind::NotConnected, "no connection")
558                )?;
559
560        return sock.send(msg).await;
561    }
562
563    async 
564    fn disconnectlog(&mut self) -> std::io::Result<()>
565    {
566        match self.sock.take()
567        {
568            Some(s) => 
569            {
570                self.cur_tap_type = TapType::None;
571
572                s.shutdown(Shutdown::Both)
573            },
574            None =>
575            {
576                self.cur_tap_type = TapType::None;
577
578                Ok(())
579            }
580        }
581    }
582
583    #[inline]
584    fn is_connected(&self) -> bool
585    {
586        return self.sock.is_some();
587    }
588
589    #[inline]
590    fn get_type(&self) -> TapType 
591    {
592        return self.cur_tap_type;
593    }
594
595    #[inline]
596    fn get_max_msg_size() -> usize
597    {
598        return crate::SyslogLocal::get_max_msg_len();
599    }
600
601    #[inline]
602    fn update_tap_data(&mut self, tap_data: SyslogLocal)
603    {
604        self.tap_data = tap_data;
605    }
606}
607
608
609
610#[cfg(feature = "build_ext_file")]
611mod with_file
612{
613    use std::io::ErrorKind;
614
615    use tokio::fs::File;
616
617    use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes,  map_error_os, SyslogDestMsg, SyslogFile, TapType};
618
619    impl AsyncSyslogTap<SyslogFile> for AsyncTap<File, SyslogFile>
620    {
621        fn new(req_tap: SyslogFile) -> SyRes<Self>
622        {
623            let ret = 
624                Self
625                {
626                    sock: None, 
627                    tap_data: req_tap,
628                    cur_tap_type: TapType::LocalFile,
629                };
630
631            return Ok(ret);
632        }
633
634        async 
635        fn connectlog(&mut self) -> SyRes<()> 
636        {
637
638            let file = 
639                File::options()
640                    .append(true)
641                    .read(true)
642                    .write(true)
643                    .create(true)
644                    .open(self.tap_data.get_path())
645                    .await
646                    .map_err(|e|
647                        map_error_os!(e, "can not open file '{}' to write, error: '{}'", 
648                            self.tap_data.get_path().display(), e)
649                    )?;
650
651            self.sock = Some(file);
652
653            return Ok(());
654        }
655
656        #[inline]
657        async 
658        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
659        {
660            use crate::tokio::io::AsyncWriteExt;
661            
662            let sock = 
663                self
664                    .sock
665                    .as_mut()
666                    .ok_or_else(||
667                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
668                    )?;
669            
670            sock.write_all(msg).await?;
671
672            return Ok(msg.len());
673        }
674
675        async 
676        fn disconnectlog(&mut self) -> std::io::Result<()> 
677        {
678            match self.sock.take()
679            {
680                Some(s) => 
681                {
682                    self.cur_tap_type = TapType::None;
683
684                    s.sync_data().await?;
685
686                    drop(s);
687
688                    return Ok(());
689                },
690                None =>
691                {
692                    self.cur_tap_type = TapType::None;
693
694                    return Ok(())
695                }
696            }
697        }
698
699        #[inline]
700        fn is_connected(&self) -> bool 
701        {
702            return self.sock.is_some();
703        }
704
705        #[inline]
706        fn get_type(&self) -> TapType 
707        {
708            return self.cur_tap_type;
709        }
710
711        #[inline]
712        fn get_max_msg_size() -> usize
713        {
714            return crate::SyslogFile::get_max_msg_len();
715        }
716
717        #[inline]
718        fn update_tap_data(&mut self, tap_data: SyslogFile)
719        {
720            self.tap_data = tap_data;
721        }
722    }
723}
724