1use coarsetime::Instant;
2use rustls::server::Acceptor;
3use rustls::{ServerConfig, ServerConnection};
4use rustls::pki_types::{CertificateDer, PrivateKeyDer};
5use usiem::components::dataset::SiemDatasetType;
6use usiem::components::dataset::text_map::TextMapSynDataset;
7use usiem::components::storage::DummyStateStorage;
8use usiem::crossbeam_channel::{self, Receiver, Sender};
9use usiem::prelude::{SiemComponentStateStorage, SiemError};
10use usiem::prelude::dataset::holder::DatasetHolder;
11use std::borrow::Cow;
12use std::collections::LinkedList;
13use std::io::{Read, ErrorKind};
14use std::net::TcpListener;
15use std::sync::Arc;
16
17
18use usiem::components::command::SiemCommandCall;
19use usiem::components::common::{
20 SiemComponentCapabilities, SiemMessage,
21};
22use usiem::components::SiemComponent;
23use usiem::events::SiemLog;
24
25use crate::common::read_log;
26#[cfg(feature="metrics")]
27use crate::metrics::{SyslogMetrics, generate_syslog_input_metrics};
28#[cfg(feature="metrics")]
29use usiem::components::metrics::SiemMetricDefinition;
30
31pub const CERTIFICATE_FILENAME_CONFIGURATION : &str = "CERTIFICATE_FILENAME";
32pub const PRIVATE_KEY_FILENAME_CONFIGURATION : &str = "PRIVATE_KEY_FILENAME";
33
34#[derive(Clone)]
35pub struct SyslogTlsInput {
36 host: Cow<'static, str>,
37 local_sender: Sender<SiemMessage>,
38 local_receiver: Receiver<SiemMessage>,
39 log_sender: Sender<SiemLog>,
40 log_receiver: Receiver<SiemLog>,
41 datasets : DatasetHolder,
42 #[cfg(feature="metrics")]
43 metrics: (Vec<SiemMetricDefinition>, SyslogMetrics),
44 storage: Box<dyn SiemComponentStateStorage>
45}
46
47impl SiemComponent for SyslogTlsInput {
48 fn name(&self) -> &'static str {
49 "SyslogTlsInput"
50 }
51
52 fn set_log_channel(&mut self, sender: Sender<SiemLog>, receiver: Receiver<SiemLog>) {
53 self.log_sender = sender;
54 self.log_receiver = receiver;
55 }
56
57 fn local_channel(&self) -> Sender<SiemMessage> {
58 self.local_sender.clone()
59 }
60
61 fn capabilities(&self) -> SiemComponentCapabilities {
62 return SiemComponentCapabilities::new(
63 Cow::Borrowed("SyslogTlsInput"),
64 Cow::Borrowed("Syslog input"),
65 Cow::Borrowed(""),
66 vec![],
67 vec![],
68 vec![],
69 self.metrics.0.clone(),
70 );
71 }
72
73 fn run(&mut self) -> Result<(), SiemError> {
74 let (cert_filename, key_filename) = if let (Some(a), Some(b)) = (self.get_certificate_filename(), self.get_private_key_filename()) {
75 (a, b)
76 }else {
77 return Err(SiemError::Configuration("Invalid certificate or key filename".into()));
78 };
79 let certs = self.load_certs(&cert_filename)?;
80 let key = self.load_private_key(&key_filename)?;
81
82 let server_config = ServerConfig::builder()
83 .with_no_client_auth()
84 .with_single_cert(certs, key)
85 .map_err(error)?;
86 let server_config = Arc::new(server_config);
87 let listener = match TcpListener::bind(&self.host[..]) {
88 Ok(v) => v,
89 Err(_) => {
90 return Err(SiemError::Io(format!("Cannot start TCP syslog listener on port: {}", self.host)));
91 }
92 };
93 if let Err(err) = listener.set_nonblocking(true) {
94 return Err(SiemError::Io(format!("Cannot start TCP syslog listener. Error setting nonblocking: {}", err)));
95 }
96 let local_receiver = self.local_receiver.clone();
97 let log_sender = self.log_sender.clone();
98 let mut pending_connections = LinkedList::new();
99 let mut accepted_connections: LinkedList<(ServerConnection, std::net::TcpStream, Vec<u8>)> = LinkedList::new();
100 let mut buffer = [0; 4096];
101 let mut last_active_connections = 0;
102 loop {
103 Instant::update();
104 match listener.accept(){
105 Ok((stream, _socket)) => {
106 let acceptor = Acceptor::default();
107 let _ = stream.set_nonblocking(true);
108 self.increase_total_connections_metric();
109 pending_connections.push_back((acceptor, stream));
110 },
111 Err(e) => {
112 match e.kind() {
113 std::io::ErrorKind::WouldBlock => {
114 match local_receiver.try_recv() {
115 Ok(msg) => match msg {
116 SiemMessage::Command(_hdr, cmd) => match cmd {
117 SiemCommandCall::STOP_COMPONENT(_) => return Ok(()),
118 _ => {}
119 },
120 _ => {}
121 },
122 Err(_) => {}
123 }
124 }
125 _ => {
126 usiem::info!("Error listening for connections: {:?}", e);
127 return Ok(());
128 }
129 }
130 },
131 }
132 let n_listeners = pending_connections.len();
133 for _ in 0..n_listeners {
134 let (mut acceptor, mut stream) = match pending_connections.pop_front() {
135 Some(v) => v,
136 None => break
137 };
138 if let Err(err) = acceptor.read_tls(&mut stream) {
139 if let std::io::ErrorKind::WouldBlock = err.kind() {
140 pending_connections.push_back((acceptor, stream));
141 continue;
142 }
143 usiem::debug!("Cannot accept TLS request: {:?}", err);
144 continue
145 }
146 match acceptor.accept() {
147 Ok(None) => {},
148 Ok(Some(v)) => {
149 let conn = match v.into_connection(server_config.clone()) {
150 Ok(v) => v,
151 Err(err) => {
152 usiem::debug!("Cannot accept TLS request: {:?}", err);
153 continue
154 }
155 };
156 accepted_connections.push_back((conn, stream, Vec::with_capacity(4096)));
157 continue
158 },
159 Err(err) => {
160 usiem::debug!("Cannot accept TLS request: {:?}", err);
161 continue
162 }
163 }
164 pending_connections.push_front((acceptor, stream));
165
166 };
167 let n_connections = accepted_connections.len();
168 for _ in 0..n_connections {
169 let (mut connection, mut stream, mut text_log) = match accepted_connections.pop_front() {
170 Some(v) => v,
171 None => break
172 };
173 let mut tls_stream = rustls::Stream::new(&mut connection, &mut stream);
174 let readed = match tls_stream.read(&mut buffer) {
175 Ok(v) => v,
176 Err(err) => {
177 if let ErrorKind::WouldBlock = err.kind() {
178 accepted_connections.push_back((connection, stream, text_log));
179 continue;
180 }
181 continue
182 }
183 };
184 if readed == 0 {
185 continue;
186 }
187 self.increase_received_bytes_metric(readed);
188 let sent = read_log(&buffer[0..readed], &mut text_log, &log_sender);
189 self.increase_received_logs_metric_by(sent);
190 accepted_connections.push_back((connection, stream, text_log));
191 }
192
193
194 if last_active_connections != accepted_connections.len() {
195 last_active_connections = accepted_connections.len();
196 self.set_active_connections_metric(last_active_connections);
197 }
198 }
199 }
200
201 fn duplicate(&self) -> Box<dyn SiemComponent> {
202 Box::new(self.clone())
203 }
204
205 fn set_datasets(&mut self, datasets: DatasetHolder) {
206 self.datasets = datasets;
207 }
208
209 fn set_storage(&mut self, storage: Box<dyn SiemComponentStateStorage>) {
210 self.storage = storage;
212 }
213}
214
215impl SyslogTlsInput {
216 pub fn new(host: &str, name : &str) -> SyslogTlsInput {
217 let (local_sender, local_receiver) = crossbeam_channel::bounded(1000);
218 let (log_sender, log_receiver) = crossbeam_channel::bounded(1000);
219 SyslogTlsInput {
220 host : Cow::Owned(host.into()),
221 local_sender,
222 local_receiver,
223 log_sender,
224 log_receiver,
225 datasets : DatasetHolder::new(),
226 #[cfg(feature="metrics")]
227 metrics : generate_syslog_input_metrics(name),
228 storage : Box::new(DummyStateStorage{})
229 }
230 }
231 fn _increase_received_logs_metric(&self) {
232 #[cfg(feature="metrics")]
233 self.metrics.1.received_logs.inc();
234 }
235 fn increase_received_logs_metric_by(&self, total : usize) {
236 #[cfg(feature="metrics")]
237 self.metrics.1.received_logs.inc_by(total as i64);
238 }
239 fn increase_total_connections_metric(&self) {
240 #[cfg(feature="metrics")]
241 self.metrics.1.total_connections.inc();
242 }
243 fn increase_received_bytes_metric(&self, bytes : usize) {
244 #[cfg(feature="metrics")]
245 self.metrics.1.received_bytes.inc_by(bytes as i64);
246 }
247 fn set_active_connections_metric(&self, connections : usize) {
248 #[cfg(feature="metrics")]
249 self.metrics.1.active_connections.set(connections as f64);
250 }
251 fn load_certs(&self, filename: &str) -> Result<Vec<CertificateDer<'static>>, SiemError> {
252 let file = self.storage.get_file(filename)?;
253 match rustls_pemfile::read_one_from_slice(&file) {
254 Ok(Some(v)) => match v.0 {
255 rustls_pemfile::Item::X509Certificate(cert) => Ok(vec![cert]),
256 _ => Err(SiemError::Storage(usiem::err::StorageError::NotExists))
257 },
258 Err(err) => Err(SiemError::Configuration(format!("Invalid PEM certificate: {:?}", err))),
259 Ok(None) => Err(SiemError::Configuration("Empty certificate".into()))
260 }
261 }
262
263 fn load_private_key(&self, filename: &str) -> Result<PrivateKeyDer<'static>, SiemError> {
265 let file = self.storage.get_file(filename)?;
266 match rustls_pemfile::read_one_from_slice(&file) {
267 Ok(Some(v)) => match v.0 {
268 rustls_pemfile::Item::Pkcs1Key(key) => Ok(key.into()),
269 rustls_pemfile::Item::Pkcs8Key(key) => Ok(key.into()),
270 rustls_pemfile::Item::Sec1Key(key) => Ok(key.into()),
271 _ => Err(SiemError::Configuration("Invalid key format".into()))
272 },
273 Err(err) => Err(SiemError::Configuration(format!("Invalid PEM certificate: {:?}", err))),
274 Ok(None) => Err(SiemError::Configuration("Empty certificate".into()))
275 }
276 }
277
278 fn get_certificate_filename(&self) -> Option<String> {
279 let config : &TextMapSynDataset = self.datasets.get(&SiemDatasetType::Configuration)?.try_into().ok()?;
280 let filename = config.get(CERTIFICATE_FILENAME_CONFIGURATION)?;
281 Some(filename.to_string())
282 }
283 fn get_private_key_filename(&self) -> Option<String> {
284 let config : &TextMapSynDataset = self.datasets.get(&SiemDatasetType::Configuration)?.try_into().ok()?;
285 let filename = config.get(PRIVATE_KEY_FILENAME_CONFIGURATION)?;
286 Some(filename.to_string())
287 }
288}
289
290
291fn error(err: rustls::Error) -> SiemError {
292 SiemError::Other(err.to_string())
293}
294
295#[cfg(test)]
296mod tst {
297
298 use super::*;
299 use std::thread;
300 use crate::testing::*;
301
302 #[test]
303 fn tls_syslog_basic_test() {
304 let sys_input = SyslogTlsInput::new(TLS_LISTENING_HOST, SYSLOG_COMPONENT_NAME);
305 let (capabilities, log_receiver) = prepare_syslog_basic_test(Box::new(sys_input));
306 let mut client = testing_tls_client();
307 let stream = client.stream();
308 thread::sleep(std::time::Duration::from_millis(100));
309 syslog_basic_test(stream, &capabilities, &log_receiver);
310 drop(client);
311 thread::sleep(std::time::Duration::from_millis(100));
312 check_no_active_connections(&capabilities);
313 }
314}