logstash_rs/output/
tcp.rs

1use crate::prelude::*;
2use std::fmt::Write as FMTWrite;
3use std::io::Write as IOWrite;
4use std::net::TcpStream;
5use std::net::ToSocketAddrs;
6use std::sync::Mutex;
7use std::time::Duration;
8
9type Stream = Box<dyn IOWrite + Sync + Send>;
10
11pub(crate) struct AdvancedTcpStream {
12    hostname: String,
13    port: u16,
14    use_tls: bool,
15    stream: Mutex<Option<Stream>>,
16    connection_timeout: Option<Duration>,
17}
18
19impl AdvancedTcpStream {
20    pub(crate) fn new(
21        hostname: String,
22        port: u16,
23        use_tls: bool,
24        connection_timeout: Option<Duration>,
25    ) -> Self {
26        Self {
27            hostname,
28            port,
29            use_tls,
30            stream: Mutex::new(None),
31            connection_timeout,
32        }
33    }
34
35    pub(crate) fn send_bytes(&self, bytes: &[u8]) -> Result<()> {
36        let mut stream = self.stream.lock()?;
37        let should_repeat = self.send_bytes_inner(&mut stream, bytes)?;
38        if should_repeat {
39            self.send_bytes_inner(&mut stream, bytes)?;
40        }
41        Ok(())
42    }
43
44    fn send_bytes_inner(&self, stream: &mut Option<Stream>, bytes: &[u8]) -> Result<bool> {
45        let recreated = self.recreate_stream_if_needed(stream)?;
46        if let Err(err) = stream.as_mut().expect("should be some").write_all(bytes) {
47            *stream = None;
48            if !recreated {
49                return Ok(true);
50            }
51            return Err(err.into());
52        }
53        Ok(false)
54    }
55
56    fn recreate_stream_if_needed(&self, stream: &mut Option<Stream>) -> Result<bool> {
57        if stream.is_none() {
58            *stream = Some(if self.use_tls {
59                self.create_tls_connection()?
60            } else {
61                self.create_tcp_connection()?
62            });
63            Ok(true)
64        } else {
65            Ok(false)
66        }
67    }
68
69    fn create_connection(&self) -> Result<TcpStream> {
70        let addr = (self.hostname.as_str(), self.port)
71            .to_socket_addrs()?
72            .next()
73            .ok_or_else(|| Error::AddressResolution(self.hostname.clone(), self.port))?;
74        let stream = if let Some(timeout) = self.connection_timeout {
75            TcpStream::connect_timeout(&addr, timeout)?
76        } else {
77            TcpStream::connect(addr)?
78        };
79        Ok(stream)
80    }
81
82    fn create_tcp_connection(&self) -> Result<Stream> {
83        Ok(Box::new(self.create_connection()?))
84    }
85
86    #[cfg(all(feature = "tls", feature = "rustls"))]
87    fn create_tls_connection(&self) -> Result<Stream> {
88        compile_error!("Select one of 'tls' or 'rustls' feature");
89        unreachable!();
90    }
91
92    #[cfg(all(feature = "tls", not(feature = "rustls")))]
93    fn create_tls_connection(&self) -> Result<Stream> {
94        use native_tls::HandshakeError;
95        let conn = native_tls::TlsConnector::new()?;
96        let stream = self.create_connection()?;
97        let mut stream = conn.connect(self.hostname.as_str(), stream);
98        while let Err(err) = stream {
99            match err {
100                HandshakeError::Failure(err) => return Err(err.into()),
101                HandshakeError::WouldBlock(block) => {
102                    stream = block.handshake();
103                }
104            }
105        }
106        Ok(Box::new(stream.expect("handshake completed")))
107    }
108
109    #[cfg(all(not(feature = "tls"), feature = "rustls"))]
110    fn create_tls_connection(&self) -> Result<Stream> {
111        use std::convert::TryInto;
112        use std::sync::Arc;
113        let mut root_store = rustls_crate::RootCertStore::empty();
114        root_store.add_server_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta| {
115            rustls_crate::OwnedTrustAnchor::from_subject_spki_name_constraints(
116                ta.subject,
117                ta.spki,
118                ta.name_constraints,
119            )
120        }));
121        let config = rustls_crate::ClientConfig::builder()
122            .with_safe_defaults()
123            .with_root_certificates(root_store)
124            .with_no_client_auth();
125        let session = rustls_crate::ClientConnection::new(
126            Arc::new(config),
127            self.hostname.as_str().try_into()?,
128        )?;
129        let stream = self.create_connection()?;
130        let stream = rustls_crate::StreamOwned::new(session, stream);
131        Ok(Box::new(stream))
132    }
133
134    #[cfg(all(not(feature = "tls"), not(feature = "rustls")))]
135    fn create_tls_connection(&self) -> Result<Stream> {
136        panic!("TLS is not supported. Please enable 'tls' feature")
137    }
138
139    fn flush(&self) -> Result<()> {
140        let mut stream = self.stream.lock()?;
141        let recreated = self.recreate_stream_if_needed(&mut stream)?;
142        if !recreated {
143            stream.as_mut().expect("should be some").flush()?;
144        }
145        Ok(())
146    }
147}
148
149pub struct TcpSender {
150    stream: AdvancedTcpStream,
151}
152
153impl TcpSender {
154    pub fn new(
155        hostname: String,
156        port: u16,
157        use_tls: bool,
158        connection_timeout: Option<Duration>,
159    ) -> Self {
160        Self {
161            stream: AdvancedTcpStream::new(hostname, port, use_tls, connection_timeout),
162        }
163    }
164}
165
166impl Sender for TcpSender {
167    fn send(&self, event: LogStashRecord) -> Result<()> {
168        let mut event = serde_json::to_string(&event)?;
169        event.write_char('\n')?;
170        self.stream.send_bytes(event.as_bytes())?;
171        Ok(())
172    }
173
174    fn send_batch(&self, events: Vec<LogStashRecord>) -> Result<()> {
175        if events.is_empty() {
176            return Ok(());
177        }
178        let mut buf = vec![];
179        for event in events {
180            serde_json::to_writer(&mut buf, &event)?;
181            buf.push('\n' as u8);
182        }
183        self.stream.send_bytes(&buf)?;
184        Ok(())
185    }
186
187    fn flush(&self) -> Result<()> {
188        self.stream.flush()?;
189        Ok(())
190    }
191}
192
193impl log::Log for TcpSender {
194    fn enabled(&self, _metadata: &log::Metadata) -> bool {
195        true
196    }
197
198    fn log(&self, record: &log::Record) {
199        let record = LogStashRecord::from_record(record);
200        let _ = self.send(record);
201    }
202
203    fn flush(&self) {
204        let _ = Sender::flush(self);
205    }
206}