1#![deny(unused_must_use, unused_imports, bare_trait_objects)]
58pub mod config;
59mod errors;
60mod eve;
61mod intel;
62#[allow(dead_code)]
63#[cfg(feature = "protobuf")]
64mod serde_helpers;
65
66pub mod prelude {
67 pub use super::config::Config;
68 pub use super::errors::Error;
69 pub use super::eve::*;
70 pub use super::intel::{
71 CachedRule, IdsKey, IntelCache, Observable, Observed, Rule, Rules, Tracer,
72 };
73 #[cfg(feature = "protobuf")]
74 pub use super::proto;
75 pub use super::Ids;
76 pub use packet_ipc::AsIpcPacket;
77
78 pub use chrono;
79}
80
81#[cfg(feature = "protobuf")]
82pub(crate) use eve::parse_date_time;
83
84#[allow(missing_docs)]
85#[cfg(feature = "protobuf")]
86pub mod proto {
87 tonic::include_proto!("suricata");
88
89 impl crate::intel::Observable for Eve {
90 fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
91 self.timestamp
92 .clone()
93 .map(|ts| {
94 let ts = chrono::NaiveDateTime::from_timestamp(ts.seconds, ts.nanos as _);
95 chrono::DateTime::from_utc(ts, chrono::Utc)
96 })
97 .unwrap_or_else(|| chrono::Utc::now())
98 }
99
100 fn key(&self) -> Option<crate::intel::IdsKey> {
101 self.alert.as_ref().map(|a| crate::intel::IdsKey {
102 gid: a.gid as _,
103 sid: a.signature_id as _,
104 })
105 }
106 }
107}
108
109use crate::config::output::{Output, OutputType};
110use config::Config;
111use log::*;
112use packet_ipc::ConnectedIpc;
113use prelude::*;
114use smol::future::or;
115use smol::io::AsyncBufReadExt;
116use smol::stream::{Stream, StreamExt};
117use smol::Task;
118use std::path::PathBuf;
119use std::time::Duration;
120
121pub struct SpawnContext<'a, M> {
124 process: Option<std::process::Child>,
125 awaiting_servers: Vec<Task<Result<packet_ipc::ConnectedIpc<'a>, Error>>>,
126 awaiting_readers: Vec<Task<Result<EveReader<M>, Error>>>,
127}
128
129impl<'a, M: Send + 'static> SpawnContext<'a, M> {
130 fn spawn_suricata(args: &Config) -> Result<std::process::Child, Error> {
131 let mut command = std::process::Command::new(args.exe_path.to_str().unwrap());
132 let server_args: Vec<String> = vec![
133 "-c",
134 args.materialize_config_to.to_str().unwrap(),
135 "--capture-plugin=ipc-plugin",
136 ]
137 .into_iter()
138 .map(String::from)
139 .collect();
140
141 command
142 .args(server_args)
143 .stdin(std::process::Stdio::null())
144 .stderr(std::process::Stdio::piped())
145 .stdout(std::process::Stdio::piped());
146 info!("Spawning {:?}", command);
147 command.spawn().map_err(Error::Io)
148 }
149 fn suricata_output_stream(
152 process: &mut std::process::Child,
153 ) -> impl Stream<Item = Result<Result<String, String>, Error>> {
154 let stdout_complete = {
155 let o = process.stdout.take().unwrap();
156 let o = smol::Unblock::new(o);
157 let reader = smol::io::BufReader::new(o);
158 reader
159 .lines()
160 .map(move |t| match t {
161 Ok(l) => Ok(Ok(l)),
162 Err(e) => Err(Error::Io(e)),
163 })
164 .fuse()
165 };
166 let stderr_complete = {
167 let o = process.stderr.take().unwrap();
168 let o = smol::Unblock::new(o);
169 let reader = smol::io::BufReader::new(o);
170 reader
171 .lines()
172 .map(move |t| match t {
173 Ok(l) => Ok(Err(l)),
174 Err(e) => Err(Error::Io(e)),
175 })
176 .fuse()
177 };
178 smol::stream::or(stdout_complete, stderr_complete).boxed()
179 }
180
181 pub fn new(
191 args: &Config,
192 ) -> Result<
193 (
194 SpawnContext<'a, M>,
195 impl Stream<Item = Result<Result<String, String>, Error>>,
196 ),
197 Error,
198 > {
199 if (args.max_pending_packets as usize) < args.ipc_plugin.allocation_batch_size {
200 return Err(Error::Custom {
201 msg: "Max pending packets must be larger than IPC allocation batch".into(),
202 });
203 }
204 let opt_size = args.buffer_size.clone();
206
207 let awaiting_readers: Vec<_> = args
208 .outputs
209 .iter()
210 .flat_map(|c| connect_output::<M>(c, opt_size.clone()))
211 .collect();
212
213 debug!("Readers are listening, starting suricata");
214
215 let (ipc_plugin, servers) = args.ipc_plugin.clone().into_plugin()?;
216 args.materialize(ipc_plugin)?;
217
218 let awaiting_servers: Vec<Task<Result<ConnectedIpc, Error>>> = servers
219 .into_iter()
220 .map(|s| smol::spawn(async move { s.accept().map_err(Error::PacketIpc) }))
221 .collect();
222
223 let mut process = Self::spawn_suricata(&args)?;
224 debug!("Spawn complete");
225
226 let output_streams = Self::suricata_output_stream(&mut process);
227 let context = SpawnContext {
228 process: Some(process),
229 awaiting_servers,
230 awaiting_readers,
231 };
232 debug!("Return stream and ctx");
233 Ok((context, output_streams))
234 }
235}
236
237impl<'a, T> Drop for SpawnContext<'a, T> {
238 fn drop(&mut self) {
239 let process = match std::mem::replace(&mut self.process, None) {
240 Some(process) => process,
241 None => return,
242 };
243 let pid = process.id() as _;
244 unsafe { libc::kill(pid, libc::SIGKILL) };
246 }
247}
248
249pub struct Ids<'a, T> {
250 close_grace_period: Option<Duration>,
251 readers: Vec<EveReader<T>>,
252 process: Option<std::process::Child>,
253 ipc_servers: Vec<packet_ipc::ConnectedIpc<'a>>,
254}
255
256unsafe impl<'a, T> Send for Ids<'a, T> {}
257unsafe impl<'a, T> Sync for Ids<'a, T> {}
258
259impl<'a, T> Drop for Ids<'a, T> {
260 fn drop(&mut self) {
261 let _ = self.close();
262
263 let mut process = match std::mem::replace(&mut self.process, None) {
264 Some(process) => process,
265 None => return,
266 };
267
268 let pid = process.id() as _;
270 unsafe { libc::kill(pid, libc::SIGTERM) };
271
272 if let Some(close_grace_period) = self.close_grace_period {
273 smol::block_on(or(
274 smol::unblock(move || {
275 if let Err(e) = process.wait() {
276 error!(
277 "Unexpected error while waiting on suricata process: {:?}",
278 e
279 );
280 }
281 }),
282 async move {
283 smol::Timer::after(close_grace_period).await;
285 unsafe { libc::kill(pid, libc::SIGKILL) };
287 },
288 ));
289 } else if let Err(e) = process.kill() {
290 error!("Failed to stop suricata process: {:?}", e);
291 }
292 }
293}
294
295impl<'a, M> Ids<'a, M> {
296 pub fn send<'b, T: AsIpcPacket + 'a>(
297 &'a self,
298 packets: &'b [T],
299 server_id: usize,
300 ) -> Result<usize, Error> {
301 if let Some(ipc_server) = self.ipc_servers.get(server_id) {
302 let packets_sent = packets.len();
303 ipc_server.send(packets).map_err(Error::PacketIpc)?;
304 Ok(packets_sent)
305 } else {
306 Err(Error::Custom {
307 msg: "Cannot send when Ids already closed.".to_string(),
308 })
309 }
310 }
311
312 pub fn close(&mut self) -> Result<(), Error> {
313 for mut server in self.ipc_servers.drain(..) {
314 server.close().map_err(Error::PacketIpc)?
315 }
316 Ok(())
317 }
318
319 pub fn take_readers(&mut self) -> Vec<EveReader<M>> {
320 std::mem::replace(&mut self.readers, vec![])
321 }
322
323 pub fn reload_rules(&self) -> bool {
324 if let Some(ref p) = self.process {
325 unsafe { libc::kill(p.id() as _, libc::SIGUSR2) == 0 }
326 } else {
327 false
328 }
329 }
330
331 pub async fn new_with_spawn_context(
332 args: Config,
333 mut spawn_context: SpawnContext<'a, M>,
334 ) -> Result<Ids<'a, M>, Error> {
335 if (args.max_pending_packets as usize) < args.ipc_plugin.allocation_batch_size {
336 return Err(Error::Custom {
337 msg: "Max pending packets must be larger than IPC allocation batch".into(),
338 });
339 }
340 let close_grace_period = args.close_grace_period.clone();
341
342 let pending_ipc_connections = std::mem::take(&mut spawn_context.awaiting_servers);
343 let awaiting_readers = std::mem::take(&mut spawn_context.awaiting_readers);
344
345 let connected_ipcs = async move {
346 let mut ipcs = Vec::with_capacity(pending_ipc_connections.len());
347 for ipc in pending_ipc_connections {
348 ipcs.push(ipc.await);
349 }
350 let ipcs: Result<Vec<_>, _> = ipcs.into_iter().collect();
351 ipcs
352 }
353 .await?;
354
355 debug!("IPC Connection formed");
356
357 let readers = async move {
358 let mut readers = Vec::with_capacity(awaiting_readers.len());
359 for connection in awaiting_readers {
360 readers.push(connection.await);
361 }
362 let readers: Result<Vec<_>, _> = readers.into_iter().collect();
363 readers
364 }
365 .await?;
366
367 debug!("Eve readers formed.");
368
369 if !readers.is_empty() {
370 debug!("{} Eve Readers connected", readers.len());
371 }
372
373 Ok(Ids {
374 close_grace_period: close_grace_period,
375 readers: readers,
376 process: (&mut spawn_context).process.take(),
377 ipc_servers: connected_ipcs,
378 })
379 }
380
381 pub async fn new(args: Config) -> Result<Ids<'a, M>, Error>
382 where
383 M: Send + 'static,
384 {
385 let (spawn_ctx, stdout_stream) = SpawnContext::new(&args)?;
386 let pid: u32 = spawn_ctx
387 .process
388 .as_ref()
389 .map(|p| p.id())
390 .ok_or(Error::Custom {
391 msg: String::from("Missing process."),
392 })?;
393
394 let stdout_fut = stdout_stream.for_each(move |r| match r {
395 Err(io) => {
396 error!("Fatal io Error ({}) {:?}", pid, io)
397 }
398 Ok(Ok(line)) => {
399 debug!("[Suricata ({})] {}", pid, line);
400 }
401 Ok(Err(line)) => {
402 error!("[Suricata ({})] {}", pid, line);
403 }
404 });
405 smol::spawn(stdout_fut).detach();
406
407 info!("SpawnContext created");
408
409 Self::new_with_spawn_context(args, spawn_ctx).await
410 }
411}
412
413fn connect_output<M: Send + 'static>(
414 output: &Box<dyn Output + Send + Sync>,
415 opt_size: Option<usize>,
416) -> Option<smol::Task<Result<EveReader<M>, Error>>> {
417 if let Some(path) = output.eve().listener(&output.output_type()) {
418 let r = match connect_uds(path, output.output_type().clone(), opt_size) {
419 Err(e) => smol::spawn(async move { Err(e) }),
420 Ok(t) => t,
421 };
422 Some(r)
423 } else {
424 None
425 }
426}
427
428fn connect_uds<M: Send + 'static>(
429 path: PathBuf,
430 output_type: OutputType,
431 opt_size: Option<usize>,
432) -> Result<smol::Task<Result<EveReader<M>, Error>>, Error> {
433 debug!(
434 "Spawning acceptor for uds connection from suricata for {:?}",
435 path
436 );
437 if path.exists() {
438 std::fs::remove_file(&path)?;
439 }
440 debug!("Listening to {:?} for event type {:?}", path, output_type);
441 let listener = std::os::unix::net::UnixListener::bind(path.clone()).map_err(Error::from)?;
442 let r = match smol::Async::new(listener).map_err(Error::from) {
443 Err(e) => smol::spawn(async move { Err(e) }),
444 Ok(listener) => smol::spawn(async move {
445 listener.accept().await.map_err(Error::from).map(|t| {
446 let (uds_connection, uds_addr) = t;
447
448 debug!("UDS connection formed from {:?}", uds_addr);
449
450 if let Some(sz) = opt_size {
451 EveReader::with_capacity(path, output_type, uds_connection, sz)
452 } else {
453 EveReader::new(path, output_type, uds_connection)
454 }
455 })
456 }),
457 };
458 Ok(r)
459}