usiem_syslog/
tls.rs

1use coarsetime::Instant;
2use rustls::server::Acceptor;
3use rustls::{ServerConfig, ServerConnection};
4use rustls::pki_types::{CertificateDer, PrivateKeyDer};
5use usiem::components::dataset::SiemDatasetType;
6use usiem::components::dataset::text_map::TextMapSynDataset;
7use usiem::components::storage::DummyStateStorage;
8use usiem::crossbeam_channel::{self, Receiver, Sender};
9use usiem::prelude::{SiemComponentStateStorage, SiemError};
10use usiem::prelude::dataset::holder::DatasetHolder;
11use std::borrow::Cow;
12use std::collections::LinkedList;
13use std::io::{Read, ErrorKind};
14use std::net::TcpListener;
15use std::sync::Arc;
16
17
18use usiem::components::command::SiemCommandCall;
19use usiem::components::common::{
20    SiemComponentCapabilities, SiemMessage,
21};
22use usiem::components::SiemComponent;
23use usiem::events::SiemLog;
24
25use crate::common::read_log;
26#[cfg(feature="metrics")]
27use crate::metrics::{SyslogMetrics, generate_syslog_input_metrics};
28#[cfg(feature="metrics")]
29use usiem::components::metrics::SiemMetricDefinition;
30
31pub const CERTIFICATE_FILENAME_CONFIGURATION : &str = "CERTIFICATE_FILENAME";
32pub const PRIVATE_KEY_FILENAME_CONFIGURATION : &str = "PRIVATE_KEY_FILENAME";
33
34#[derive(Clone)]
35pub struct SyslogTlsInput {
36    host: Cow<'static, str>,
37    local_sender: Sender<SiemMessage>,
38    local_receiver: Receiver<SiemMessage>,
39    log_sender: Sender<SiemLog>,
40    log_receiver: Receiver<SiemLog>,
41    datasets : DatasetHolder,
42    #[cfg(feature="metrics")]
43    metrics: (Vec<SiemMetricDefinition>, SyslogMetrics),
44    storage: Box<dyn SiemComponentStateStorage>
45}
46
47impl SiemComponent for SyslogTlsInput {
48    fn name(&self) -> &'static str {
49        "SyslogTlsInput"
50    }
51
52    fn set_log_channel(&mut self, sender: Sender<SiemLog>, receiver: Receiver<SiemLog>) {
53        self.log_sender = sender;
54        self.log_receiver = receiver;
55    }
56
57    fn local_channel(&self) -> Sender<SiemMessage> {
58        self.local_sender.clone()
59    }
60
61    fn capabilities(&self) -> SiemComponentCapabilities {
62        return SiemComponentCapabilities::new(
63            Cow::Borrowed("SyslogTlsInput"),
64            Cow::Borrowed("Syslog input"),
65            Cow::Borrowed(""),
66            vec![],
67            vec![],
68            vec![],
69            self.metrics.0.clone(),
70        );
71    }
72
73    fn run(&mut self) -> Result<(), SiemError> {
74        let (cert_filename, key_filename) = if let (Some(a), Some(b)) = (self.get_certificate_filename(), self.get_private_key_filename()) {
75            (a, b)
76        }else {
77            return Err(SiemError::Configuration("Invalid certificate or key filename".into()));
78        };
79        let certs = self.load_certs(&cert_filename)?;
80        let key = self.load_private_key(&key_filename)?;
81
82        let server_config = ServerConfig::builder()
83            .with_no_client_auth()
84            .with_single_cert(certs, key)
85            .map_err(error)?;
86        let server_config = Arc::new(server_config);
87        let listener = match TcpListener::bind(&self.host[..]) {
88            Ok(v) => v,
89            Err(_) => {
90                return Err(SiemError::Io(format!("Cannot start TCP syslog listener on port: {}", self.host)));
91            }
92        };
93        if let Err(err) = listener.set_nonblocking(true) {
94            return Err(SiemError::Io(format!("Cannot start TCP syslog listener. Error setting nonblocking: {}", err))); 
95        }
96        let local_receiver = self.local_receiver.clone();
97        let log_sender = self.log_sender.clone();
98        let mut pending_connections = LinkedList::new();
99        let mut accepted_connections: LinkedList<(ServerConnection, std::net::TcpStream, Vec<u8>)> = LinkedList::new();
100        let mut buffer = [0; 4096];
101        let mut last_active_connections = 0;
102        loop {
103            Instant::update();
104            match listener.accept(){
105                Ok((stream, _socket)) => {
106                    let acceptor = Acceptor::default();
107                    let _ = stream.set_nonblocking(true);
108                    self.increase_total_connections_metric();
109                    pending_connections.push_back((acceptor, stream));
110                },
111                Err(e) => {
112                    match e.kind() {
113                        std::io::ErrorKind::WouldBlock => {
114                            match local_receiver.try_recv() {
115                                Ok(msg) => match msg {
116                                    SiemMessage::Command(_hdr, cmd) => match cmd {
117                                        SiemCommandCall::STOP_COMPONENT(_) => return Ok(()),
118                                        _ => {}
119                                    },
120                                    _ => {}
121                                },
122                                Err(_) => {}
123                            }
124                        }
125                        _ => {
126                            usiem::info!("Error listening for connections: {:?}", e);
127                            return Ok(());
128                        }
129                    }
130                },
131            }
132            let n_listeners = pending_connections.len();
133            for _ in 0..n_listeners {
134                let (mut acceptor, mut stream) = match pending_connections.pop_front() {
135                    Some(v) => v,
136                    None => break
137                };
138                if let Err(err) = acceptor.read_tls(&mut stream) {
139                    if let std::io::ErrorKind::WouldBlock = err.kind() {
140                        pending_connections.push_back((acceptor, stream));
141                        continue;
142                    }
143                    usiem::debug!("Cannot accept TLS request: {:?}", err);
144                    continue
145                }
146                match acceptor.accept() {
147                    Ok(None) => {},
148                    Ok(Some(v)) => {
149                        let conn = match v.into_connection(server_config.clone()) {
150                            Ok(v) => v,
151                            Err(err) => {
152                                usiem::debug!("Cannot accept TLS request: {:?}", err);
153                                continue
154                            }
155                        };
156                        accepted_connections.push_back((conn, stream, Vec::with_capacity(4096)));
157                        continue
158                    },
159                    Err(err) => {
160                        usiem::debug!("Cannot accept TLS request: {:?}", err);
161                        continue
162                    }
163                }
164                pending_connections.push_front((acceptor, stream));
165
166            };
167            let n_connections = accepted_connections.len();
168            for _ in 0..n_connections {
169                let (mut connection, mut stream, mut text_log) = match accepted_connections.pop_front() {
170                    Some(v) => v,
171                    None => break
172                };
173                let mut tls_stream = rustls::Stream::new(&mut connection, &mut stream); 
174                let readed = match tls_stream.read(&mut buffer) {
175                    Ok(v) => v,
176                    Err(err) => {
177                        if let ErrorKind::WouldBlock = err.kind() {
178                            accepted_connections.push_back((connection, stream, text_log));
179                            continue;
180                        }
181                        continue
182                    }
183                };
184                if readed == 0 {
185                    continue;
186                }
187                self.increase_received_bytes_metric(readed);
188                let sent = read_log(&buffer[0..readed], &mut text_log, &log_sender);
189                self.increase_received_logs_metric_by(sent);
190                accepted_connections.push_back((connection, stream, text_log));
191            } 
192
193
194            if last_active_connections != accepted_connections.len() {
195                last_active_connections = accepted_connections.len();
196                self.set_active_connections_metric(last_active_connections);
197            }
198        }
199    }
200
201    fn duplicate(&self) -> Box<dyn SiemComponent> {
202        Box::new(self.clone())
203    }
204
205    fn set_datasets(&mut self, datasets: DatasetHolder) {
206        self.datasets = datasets;
207    }
208
209    fn set_storage(&mut self, storage: Box<dyn SiemComponentStateStorage>) {
210        //Not required
211        self.storage = storage;
212    }
213}
214
215impl SyslogTlsInput {
216    pub fn new(host: &str, name : &str) -> SyslogTlsInput {
217        let (local_sender, local_receiver) = crossbeam_channel::bounded(1000);
218        let (log_sender, log_receiver) = crossbeam_channel::bounded(1000);
219        SyslogTlsInput {
220            host : Cow::Owned(host.into()),
221            local_sender,
222            local_receiver,
223            log_sender,
224            log_receiver,
225            datasets : DatasetHolder::new(),
226            #[cfg(feature="metrics")]
227            metrics : generate_syslog_input_metrics(name),
228            storage : Box::new(DummyStateStorage{})
229        }
230    }
231    fn _increase_received_logs_metric(&self) {
232        #[cfg(feature="metrics")]
233        self.metrics.1.received_logs.inc();
234    }
235    fn increase_received_logs_metric_by(&self, total : usize) {
236        #[cfg(feature="metrics")]
237        self.metrics.1.received_logs.inc_by(total as i64);
238    }
239    fn increase_total_connections_metric(&self) {
240        #[cfg(feature="metrics")]
241        self.metrics.1.total_connections.inc();
242    }
243    fn increase_received_bytes_metric(&self, bytes : usize) {
244        #[cfg(feature="metrics")]
245        self.metrics.1.received_bytes.inc_by(bytes as i64);
246    }
247    fn set_active_connections_metric(&self, connections : usize) {
248        #[cfg(feature="metrics")]
249        self.metrics.1.active_connections.set(connections as f64);
250    }
251    fn load_certs(&self, filename: &str) -> Result<Vec<CertificateDer<'static>>, SiemError> {
252        let file = self.storage.get_file(filename)?;
253        match rustls_pemfile::read_one_from_slice(&file) {
254            Ok(Some(v)) => match v.0 {
255                rustls_pemfile::Item::X509Certificate(cert) => Ok(vec![cert]),
256                _ => Err(SiemError::Storage(usiem::err::StorageError::NotExists))
257            },
258            Err(err) => Err(SiemError::Configuration(format!("Invalid PEM certificate: {:?}", err))),
259            Ok(None) => Err(SiemError::Configuration("Empty certificate".into()))
260        }
261    }
262    
263    /// Loads the server private key from a file
264    fn load_private_key(&self, filename: &str) -> Result<PrivateKeyDer<'static>, SiemError> {
265        let file = self.storage.get_file(filename)?;
266        match rustls_pemfile::read_one_from_slice(&file) {
267            Ok(Some(v)) => match v.0 {
268                rustls_pemfile::Item::Pkcs1Key(key) => Ok(key.into()),
269                rustls_pemfile::Item::Pkcs8Key(key) => Ok(key.into()),
270                rustls_pemfile::Item::Sec1Key(key) => Ok(key.into()),
271                _ => Err(SiemError::Configuration("Invalid key format".into()))
272            },
273            Err(err) => Err(SiemError::Configuration(format!("Invalid PEM certificate: {:?}", err))),
274            Ok(None) => Err(SiemError::Configuration("Empty certificate".into()))
275        }
276    }
277
278    fn get_certificate_filename(&self) -> Option<String> {
279        let config : &TextMapSynDataset = self.datasets.get(&SiemDatasetType::Configuration)?.try_into().ok()?;
280        let filename = config.get(CERTIFICATE_FILENAME_CONFIGURATION)?;
281        Some(filename.to_string())
282    }
283    fn get_private_key_filename(&self) -> Option<String> {
284        let config : &TextMapSynDataset = self.datasets.get(&SiemDatasetType::Configuration)?.try_into().ok()?;
285        let filename = config.get(PRIVATE_KEY_FILENAME_CONFIGURATION)?;
286        Some(filename.to_string())
287    }
288}
289
290
291fn error(err: rustls::Error) -> SiemError {
292    SiemError::Other(err.to_string())
293}
294
295#[cfg(test)]
296mod tst {
297
298    use super::*;
299    use std::thread;
300    use crate::testing::*;
301
302    #[test]
303    fn tls_syslog_basic_test() {
304        let sys_input = SyslogTlsInput::new(TLS_LISTENING_HOST, SYSLOG_COMPONENT_NAME);
305        let (capabilities, log_receiver) = prepare_syslog_basic_test(Box::new(sys_input));
306        let mut client = testing_tls_client();
307        let stream = client.stream();
308        thread::sleep(std::time::Duration::from_millis(100));
309        syslog_basic_test(stream, &capabilities, &log_receiver);
310        drop(client);
311        thread::sleep(std::time::Duration::from_millis(100));
312        check_no_active_connections(&capabilities);
313    }
314}