logstash_rs/output/
tcp.rs1use crate::prelude::*;
2use std::fmt::Write as FMTWrite;
3use std::io::Write as IOWrite;
4use std::net::TcpStream;
5use std::net::ToSocketAddrs;
6use std::sync::Mutex;
7use std::time::Duration;
8
9type Stream = Box<dyn IOWrite + Sync + Send>;
10
11pub(crate) struct AdvancedTcpStream {
12 hostname: String,
13 port: u16,
14 use_tls: bool,
15 stream: Mutex<Option<Stream>>,
16 connection_timeout: Option<Duration>,
17}
18
19impl AdvancedTcpStream {
20 pub(crate) fn new(
21 hostname: String,
22 port: u16,
23 use_tls: bool,
24 connection_timeout: Option<Duration>,
25 ) -> Self {
26 Self {
27 hostname,
28 port,
29 use_tls,
30 stream: Mutex::new(None),
31 connection_timeout,
32 }
33 }
34
35 pub(crate) fn send_bytes(&self, bytes: &[u8]) -> Result<()> {
36 let mut stream = self.stream.lock()?;
37 let should_repeat = self.send_bytes_inner(&mut stream, bytes)?;
38 if should_repeat {
39 self.send_bytes_inner(&mut stream, bytes)?;
40 }
41 Ok(())
42 }
43
44 fn send_bytes_inner(&self, stream: &mut Option<Stream>, bytes: &[u8]) -> Result<bool> {
45 let recreated = self.recreate_stream_if_needed(stream)?;
46 if let Err(err) = stream.as_mut().expect("should be some").write_all(bytes) {
47 *stream = None;
48 if !recreated {
49 return Ok(true);
50 }
51 return Err(err.into());
52 }
53 Ok(false)
54 }
55
56 fn recreate_stream_if_needed(&self, stream: &mut Option<Stream>) -> Result<bool> {
57 if stream.is_none() {
58 *stream = Some(if self.use_tls {
59 self.create_tls_connection()?
60 } else {
61 self.create_tcp_connection()?
62 });
63 Ok(true)
64 } else {
65 Ok(false)
66 }
67 }
68
69 fn create_connection(&self) -> Result<TcpStream> {
70 let addr = (self.hostname.as_str(), self.port)
71 .to_socket_addrs()?
72 .next()
73 .ok_or_else(|| Error::AddressResolution(self.hostname.clone(), self.port))?;
74 let stream = if let Some(timeout) = self.connection_timeout {
75 TcpStream::connect_timeout(&addr, timeout)?
76 } else {
77 TcpStream::connect(addr)?
78 };
79 Ok(stream)
80 }
81
82 fn create_tcp_connection(&self) -> Result<Stream> {
83 Ok(Box::new(self.create_connection()?))
84 }
85
86 #[cfg(all(feature = "tls", feature = "rustls"))]
87 fn create_tls_connection(&self) -> Result<Stream> {
88 compile_error!("Select one of 'tls' or 'rustls' feature");
89 unreachable!();
90 }
91
92 #[cfg(all(feature = "tls", not(feature = "rustls")))]
93 fn create_tls_connection(&self) -> Result<Stream> {
94 use native_tls::HandshakeError;
95 let conn = native_tls::TlsConnector::new()?;
96 let stream = self.create_connection()?;
97 let mut stream = conn.connect(self.hostname.as_str(), stream);
98 while let Err(err) = stream {
99 match err {
100 HandshakeError::Failure(err) => return Err(err.into()),
101 HandshakeError::WouldBlock(block) => {
102 stream = block.handshake();
103 }
104 }
105 }
106 Ok(Box::new(stream.expect("handshake completed")))
107 }
108
109 #[cfg(all(not(feature = "tls"), feature = "rustls"))]
110 fn create_tls_connection(&self) -> Result<Stream> {
111 use std::convert::TryInto;
112 use std::sync::Arc;
113 let mut root_store = rustls_crate::RootCertStore::empty();
114 root_store.add_server_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta| {
115 rustls_crate::OwnedTrustAnchor::from_subject_spki_name_constraints(
116 ta.subject,
117 ta.spki,
118 ta.name_constraints,
119 )
120 }));
121 let config = rustls_crate::ClientConfig::builder()
122 .with_safe_defaults()
123 .with_root_certificates(root_store)
124 .with_no_client_auth();
125 let session = rustls_crate::ClientConnection::new(
126 Arc::new(config),
127 self.hostname.as_str().try_into()?,
128 )?;
129 let stream = self.create_connection()?;
130 let stream = rustls_crate::StreamOwned::new(session, stream);
131 Ok(Box::new(stream))
132 }
133
134 #[cfg(all(not(feature = "tls"), not(feature = "rustls")))]
135 fn create_tls_connection(&self) -> Result<Stream> {
136 panic!("TLS is not supported. Please enable 'tls' feature")
137 }
138
139 fn flush(&self) -> Result<()> {
140 let mut stream = self.stream.lock()?;
141 let recreated = self.recreate_stream_if_needed(&mut stream)?;
142 if !recreated {
143 stream.as_mut().expect("should be some").flush()?;
144 }
145 Ok(())
146 }
147}
148
149pub struct TcpSender {
150 stream: AdvancedTcpStream,
151}
152
153impl TcpSender {
154 pub fn new(
155 hostname: String,
156 port: u16,
157 use_tls: bool,
158 connection_timeout: Option<Duration>,
159 ) -> Self {
160 Self {
161 stream: AdvancedTcpStream::new(hostname, port, use_tls, connection_timeout),
162 }
163 }
164}
165
166impl Sender for TcpSender {
167 fn send(&self, event: LogStashRecord) -> Result<()> {
168 let mut event = serde_json::to_string(&event)?;
169 event.write_char('\n')?;
170 self.stream.send_bytes(event.as_bytes())?;
171 Ok(())
172 }
173
174 fn send_batch(&self, events: Vec<LogStashRecord>) -> Result<()> {
175 if events.is_empty() {
176 return Ok(());
177 }
178 let mut buf = vec![];
179 for event in events {
180 serde_json::to_writer(&mut buf, &event)?;
181 buf.push('\n' as u8);
182 }
183 self.stream.send_bytes(&buf)?;
184 Ok(())
185 }
186
187 fn flush(&self) -> Result<()> {
188 self.stream.flush()?;
189 Ok(())
190 }
191}
192
193impl log::Log for TcpSender {
194 fn enabled(&self, _metadata: &log::Metadata) -> bool {
195 true
196 }
197
198 fn log(&self, record: &log::Record) {
199 let record = LogStashRecord::from_record(record);
200 let _ = self.send(record);
201 }
202
203 fn flush(&self) {
204 let _ = Sender::flush(self);
205 }
206}