syslog_rs/a_sync/async_tokio/
async_socket.rs

1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 * 
4 * Copyright 2025 Aleksandr Morozov
5 * 
6 * The syslog-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *                     
11 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
12 */
13
14
15use std::io::ErrorKind;
16use std::net::Shutdown;
17
18use tokio::net::{UnixDatagram};
19use nix::errno::Errno;
20
21use crate::a_sync::{AsyncSyslogTap, AsyncTap};
22
23
24
25use crate::{map_error_os, throw_error_errno, throw_error_os, SyslogDestMsg, SyslogLocal, TapType};
26
27use crate::common::*;
28use crate::error::SyRes;
29
30
31#[cfg(feature = "build_ext_tls")]
32mod with_tls
33{
34    use std::io::ErrorKind;
35
36    use tokio::{io::AsyncWriteExt, net::{TcpSocket, TcpStream}, time};
37    use tokio_rustls::{TlsConnector};
38    use tokio_rustls::client::TlsStream;
39
40    use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error, map_error_os, SyslogDestMsg, SyslogTls, TapType};
41
42    impl AsyncSyslogTap<SyslogTls> for AsyncTap<TlsStream<TcpStream>, SyslogTls>
43    {
44        fn new(req_tap: SyslogTls) -> SyRes<Self>
45        {
46            let ret = 
47                Self
48                {
49                    sock: None, 
50                    tap_data: req_tap,
51                    cur_tap_type: TapType::NetTcp,
52                };
53
54            return Ok(ret);
55        }
56
57        async 
58        fn connectlog(&mut self) -> SyRes<()> 
59        {
60
61            let tcp_socket = 
62                if self.tap_data.get_remote_addr().is_ipv4() == true
63                {
64                    TcpSocket::new_v4()
65                }
66                else
67                {
68                    TcpSocket::new_v6()
69                }
70                .map_err(|e|
71                    map_error!("cannot create tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
72                )?;
73
74            // bind
75            if self.tap_data.get_bind_addr().ip().is_unspecified() == false
76            {
77                tcp_socket
78                    .bind(*self.tap_data.get_bind_addr())
79                    .map_err(|e|
80                        map_error!("cannot bind tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
81                    )?;
82            }
83                
84            let socket = 
85                if let Some(c_timeout) = self.tap_data.get_get_conn_timeout()
86                {
87                    time::timeout(c_timeout, tcp_socket.connect(*self.tap_data.get_remote_addr()))
88                        .await
89                        .map_err(|e|
90                            map_error!("can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
91                        )?
92                }
93                else
94                {
95                    tcp_socket
96                        .connect(*self.tap_data.get_remote_addr())
97                        .await
98                }
99                .map_err(|e|
100                    map_error_os!(e, "cannot connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
101                )?;
102
103            let connector = TlsConnector::from(self.tap_data.get_client_config());
104
105            let stream = 
106                connector
107                    .connect(self.tap_data.get_serv_name(), socket)
108                    .await
109                    .map_err(|e|
110                        map_error!("Remote server: '{}', TLS connector error: '{}'", self.tap_data.get_remote_addr(), e)
111                    )?;
112
113            // set socket
114            self.sock = Some(stream);
115
116            return Ok(());
117        }
118
119        async 
120        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
121        {
122            let sock = 
123                self.sock
124                    .as_mut()
125                    .ok_or_else(||
126                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
127                    )?;
128
129            sock.write_all(msg).await?;
130            sock.flush().await?;
131
132            return Ok(msg.len());
133        }
134
135        async 
136        fn disconnectlog(&mut self) -> std::io::Result<()> 
137        {
138            match self.sock.take()
139            {
140                Some(mut s) => 
141                {
142                    s.shutdown().await?;
143
144                    drop(s);
145
146                    Ok(())
147                },
148                None =>
149                {
150                    Ok(())
151                }
152            }
153        }
154
155        fn is_connected(&self) -> bool 
156        {
157            return self.sock.is_some();
158        }
159
160        fn get_type(&self) -> TapType 
161        {
162            return self.cur_tap_type;
163        }
164
165        fn get_max_msg_size(&self) -> usize
166        {
167            return self.tap_data.get_max_msg_len();
168        }
169
170        fn update_tap_data(&mut self, tap_data: SyslogTls)
171        {
172            self.tap_data = tap_data;
173        }
174    }
175}
176
177
178#[cfg(feature = "build_ext_net")]
179mod with_net
180{
181    use std::io::ErrorKind;
182
183    use tokio::{io::AsyncWriteExt, net::{TcpSocket, TcpStream, UdpSocket}, time};
184
185    use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes, map_error, map_error_os, SyslogDestMsg, SyslogNetTcp, SyslogNetUdp, TapType};
186
187    impl AsyncSyslogTap<SyslogNetTcp> for AsyncTap<TcpStream, SyslogNetTcp>
188    {
189        fn new(req_tap: SyslogNetTcp) -> SyRes<Self>
190        {
191            let ret = 
192                Self
193                {
194                    sock: None, 
195                    tap_data: req_tap,
196                    cur_tap_type: TapType::NetTcp,
197                };
198
199            return Ok(ret);
200        }
201
202        async 
203        fn connectlog(&mut self) -> SyRes<()> 
204        {
205            let tcp_socket = 
206                if self.tap_data.get_remote_addr().is_ipv4() == true
207                {
208                    TcpSocket::new_v4()
209                }
210                else
211                {
212                    TcpSocket::new_v6()
213                }
214                .map_err(|e|
215                    map_error!("cannot create tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
216                )?;
217
218            // bind
219            if self.tap_data.get_bind_addr().ip().is_unspecified() == false
220            {
221                tcp_socket
222                    .bind(*self.tap_data.get_bind_addr())
223                    .map_err(|e|
224                        map_error!("cannot bind tcp socket '{}', error: {}", self.tap_data.get_remote_addr(), e)
225                    )?;
226            }
227                
228            let socket = 
229                if let Some(c_timeout) = self.tap_data.get_conn_timeout()
230                {
231                    time::timeout(c_timeout, tcp_socket.connect(*self.tap_data.get_remote_addr()))
232                        .await
233                        .map_err(|e|
234                            map_error!("can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
235                        )?
236                }
237                else
238                {
239                    tcp_socket
240                        .connect(*self.tap_data.get_remote_addr())
241                        .await
242                }
243                .map_err(|e|
244                    map_error_os!(e, "can not connect to tcp '{}', error: {}", self.tap_data.get_remote_addr(), e)
245                )?;
246
247            // set socket
248            self.sock = Some(socket);
249
250            return Ok(());
251        }
252
253        async 
254        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
255        {
256            let sock = 
257                self.sock
258                    .as_mut()
259                    .ok_or_else(||
260                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
261                    )?;
262
263            sock.write_all(msg).await?;
264            sock.flush().await?;
265
266            return Ok(msg.len());
267        }
268
269        async 
270        fn disconnectlog(&mut self) -> std::io::Result<()> 
271        {
272            match self.sock.take()
273            {
274                Some(mut s) => 
275                {
276                    s.shutdown().await?;
277
278                    drop(s);
279
280                    Ok(())
281                },
282                None =>
283                {
284                    Ok(())
285                }
286            }
287        }
288
289        fn is_connected(&self) -> bool 
290        {
291            return self.sock.is_some();
292        }
293
294        fn get_type(&self) -> TapType 
295        {
296            return self.cur_tap_type;
297        }
298
299        fn get_max_msg_size(&self) -> usize
300        {
301            return self.tap_data.get_max_msg_len();
302        }
303
304        fn update_tap_data(&mut self, tap_data: SyslogNetTcp)
305        {
306            self.tap_data = tap_data;
307        }
308    }
309
310    impl AsyncSyslogTap<SyslogNetUdp> for AsyncTap<UdpSocket, SyslogNetUdp>
311    {
312        fn new(req_tap: SyslogNetUdp) -> SyRes<Self>
313        {
314            let ret = 
315                Self
316                {
317                    sock: None, 
318                    tap_data: req_tap,
319                    cur_tap_type: TapType::NetUdp,
320                };
321
322            return Ok(ret);
323        }
324
325        async 
326        fn connectlog(&mut self) -> SyRes<()> 
327        {
328            let socket = 
329                UdpSocket
330                    ::bind(self.tap_data.get_bind_addr())
331                        .await
332                        .map_err(|e|
333                            map_error_os!(e, "can not bind udp '{}', error: {}", self.tap_data.get_bind_addr(), e)
334                        )?;
335
336            socket
337                .connect(self.tap_data.get_remote_addr())
338                .await
339                .map_err(|e|
340                    map_error_os!(e, "can not connect to udp '{}', error: {}", self.tap_data.get_remote_addr(), e)
341                )?;
342
343            // set socket
344            self.sock = Some(socket);
345
346            return Ok(());
347        }
348
349        async 
350        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
351        {
352            let sock = 
353                self
354                    .sock
355                    .as_mut()
356                    .ok_or_else(||
357                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
358                    )?;
359
360            return sock.send(msg).await;
361        }
362
363        async
364        fn disconnectlog(&mut self) -> std::io::Result<()> 
365        {
366            match self.sock.take()
367            {
368                Some(s) => 
369                {
370                    drop(s);
371
372                    Ok(())
373                },
374                None =>
375                {
376                    Ok(())
377                }
378            }
379        }
380
381        fn is_connected(&self) -> bool 
382        {
383            return self.sock.is_some();
384        }
385
386        fn get_type(&self) -> TapType 
387        {
388            return self.cur_tap_type;
389        }
390
391        fn get_max_msg_size(&self) -> usize
392        {
393            return self.tap_data.get_max_msg_len();
394        }
395
396        fn update_tap_data(&mut self, tap_data: SyslogNetUdp)
397        {
398            self.tap_data = tap_data;
399        }
400    }
401}
402
403
404impl AsyncSyslogTap<SyslogLocal> for AsyncTap<UnixDatagram, SyslogLocal>
405{
406
407    fn new(req_tap: SyslogLocal) -> SyRes<Self>
408    {
409        return Ok(
410            Self
411            {
412                sock: None, 
413                tap_data: req_tap,
414                cur_tap_type: TapType::None,
415            }
416        );
417    }
418
419
420    async 
421    fn connectlog(&mut self) -> SyRes<()> 
422    {
423        let sock = 
424            UnixDatagram
425                ::unbound()
426                    .map_err(|e|
427                        map_error_os!(e, "unbounded unix datagram initialization failure: {}", e)
428                    )?;
429
430        let tap_type = 
431            if self.tap_data.get_use_alternative() == false && self.tap_data.get_custom_remote_path().is_some() == true 
432            {
433                if let Err(e) = sock.connect(self.tap_data.get_custom_remote_path().as_ref().unwrap())
434                {
435                    throw_error_os!(e, "failed to open connection to syslog server at '{}'", 
436                        self.tap_data.get_custom_remote_path().as_ref().unwrap().display());
437                }
438                else 
439                {
440                    TapType::CustomLog
441                }
442            }
443            else if self.tap_data.get_custom_remote_path().is_some() == true &&
444                sock.connect(self.tap_data.get_custom_remote_path().as_ref().unwrap()).is_ok() == true
445            {
446                TapType::CustomLog
447            }
448            else if let Ok(_) = sock.connect(PATH_LOG_PRIV)
449            {
450                TapType::Priv
451            }
452            else if let Ok(_) = sock.connect(PATH_LOG)
453            {
454                TapType::UnPriv
455            }
456            else if let Ok(_) = sock.connect(PATH_OLDLOG)
457            {
458                TapType::OldLog
459            }
460            else if let Ok(_) = sock.connect(PATH_OSX)
461            {
462                TapType::Priv
463            }
464            else
465            {
466                // failed to open socket
467                throw_error_errno!(Errno::last(), "failed to open connection to syslog server");
468            };
469
470        #[cfg(any(
471            target_os = "freebsd",
472            target_os = "dragonfly",
473            target_os = "openbsd",
474            target_os = "netbsd",
475            target_os = "macos"
476        ))]
477        {
478            use std::os::fd::AsRawFd;
479
480            // setting sendbuf
481            let mut len: std::mem::MaybeUninit<nix::libc::socklen_t> = std::mem::MaybeUninit::uninit();
482
483            // set the sndbuf len
484            let res = 
485                unsafe
486                {
487                    nix::libc::getsockopt(
488                        sock.as_raw_fd(), 
489                        nix::libc::SOL_SOCKET, 
490                        nix::libc::SO_SNDBUF, 
491                        len.as_mut_ptr() as *mut nix::libc::c_void, 
492                        &mut { 
493                            std::mem::size_of::<nix::libc::socklen_t>() as nix::libc::socklen_t
494                        } 
495                    )
496                };
497    
498            if res == 0
499            {
500                let mut len = unsafe { len.assume_init() } as usize;
501
502                if len < MAXLINE
503                {
504                    len = MAXLINE;
505
506                    unsafe {
507                        nix::libc::setsockopt(
508                            sock.as_raw_fd(), 
509                            nix::libc::SOL_SOCKET, 
510                            nix::libc::SO_SNDBUF, 
511                            &len as *const _ as *const nix::libc::c_void, 
512                            std::mem::size_of::<nix::libc::socklen_t>() as nix::libc::socklen_t
513                        )
514                    };
515                }
516            }
517        }
518
519        self.sock = Some(sock);
520        self.cur_tap_type = tap_type;
521
522        return Ok(());
523    }
524
525    async 
526    fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
527    {
528        let sock = 
529            self.sock
530                .as_mut()
531                .ok_or_else(||
532                    std::io::Error::new(ErrorKind::NotConnected, "no connection")
533                )?;
534
535        return sock.send(msg).await;
536    }
537
538    async 
539    fn disconnectlog(&mut self) -> std::io::Result<()>
540    {
541        match self.sock.take()
542        {
543            Some(s) => 
544            {
545                self.cur_tap_type = TapType::None;
546
547                s.shutdown(Shutdown::Both)
548            },
549            None =>
550            {
551                self.cur_tap_type = TapType::None;
552
553                Ok(())
554            }
555        }
556    }
557
558    fn is_connected(&self) -> bool
559    {
560        return self.sock.is_some();
561    }
562
563    fn get_type(&self) -> TapType 
564    {
565        return self.cur_tap_type;
566    }
567
568    fn get_max_msg_size(&self) -> usize
569    {
570        return self.tap_data.get_max_msg_len();
571    }
572
573    fn update_tap_data(&mut self, tap_data: SyslogLocal)
574    {
575        self.tap_data = tap_data;
576    }
577}
578
579
580
581#[cfg(feature = "build_ext_file")]
582mod with_file
583{
584    use std::io::ErrorKind;
585
586    use tokio::fs::File;
587
588    use crate::{a_sync::{AsyncSyslogTap, AsyncTap}, error::SyRes,  map_error_os, SyslogDestMsg, SyslogFile, TapType};
589
590    impl AsyncSyslogTap<SyslogFile> for AsyncTap<File, SyslogFile>
591    {
592        fn new(req_tap: SyslogFile) -> SyRes<Self>
593        {
594            let ret = 
595                Self
596                {
597                    sock: None, 
598                    tap_data: req_tap,
599                    cur_tap_type: TapType::LocalFile,
600                };
601
602            return Ok(ret);
603        }
604
605        async 
606        fn connectlog(&mut self) -> SyRes<()> 
607        {
608
609            let file = 
610                File::options()
611                    .append(true)
612                    .read(true)
613                    .write(true)
614                    .create(true)
615                    .open(self.tap_data.get_path())
616                    .await
617                    .map_err(|e|
618                        map_error_os!(e, "can not open file '{}' to write, error: '{}'", 
619                            self.tap_data.get_path().display(), e)
620                    )?;
621
622            self.sock = Some(file);
623
624            return Ok(());
625        }
626
627        async 
628        fn send(&mut self, msg: &[u8]) -> std::io::Result<usize> 
629        {
630            use crate::tokio::io::AsyncWriteExt;
631            
632            let sock = 
633                self
634                    .sock
635                    .as_mut()
636                    .ok_or_else(||
637                        std::io::Error::new(ErrorKind::NotConnected, "no connection")
638                    )?;
639            
640            sock.write_all(msg).await?;
641
642            return Ok(msg.len());
643        }
644
645        async 
646        fn disconnectlog(&mut self) -> std::io::Result<()> 
647        {
648            match self.sock.take()
649            {
650                Some(s) => 
651                {
652                    self.cur_tap_type = TapType::None;
653
654                    s.sync_data().await?;
655
656                    drop(s);
657
658                    return Ok(());
659                },
660                None =>
661                {
662                    self.cur_tap_type = TapType::None;
663
664                    return Ok(())
665                }
666            }
667        }
668
669        fn is_connected(&self) -> bool 
670        {
671            return self.sock.is_some();
672        }
673
674        fn get_type(&self) -> TapType 
675        {
676            return self.cur_tap_type;
677        }
678
679        fn get_max_msg_size(&self) -> usize
680        {
681            return self.tap_data.get_max_msg_len();
682        }
683
684        fn update_tap_data(&mut self, tap_data: SyslogFile)
685        {
686            self.tap_data = tap_data;
687        }
688    }
689}
690