suricata_ipc/
lib.rs

1//! # suricata-rs
2//!
3//! Provide access to suricata via a library-like interface. Allows packets to be sent to suricata
4//! and alerts received.
5//!
6//! ```rust,no_run
7//! # use suricata_ipc::prelude::*;
8//! # use futures::TryStreamExt;
9//! # use std::path::PathBuf;
10//!
11//! struct Packet {
12//!     data: Vec<u8>,
13//!     timestamp: std::time::SystemTime,
14//! }
15//!
16//! impl AsIpcPacket for Packet {
17//!     fn timestamp(&self) -> &std::time::SystemTime {
18//!         &self.timestamp
19//!     }
20//!     fn data(&self) -> &[u8] {
21//!         self.data.as_slice()
22//!     }
23//! }
24//!
25//! fn main() {
26//!     let resources = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
27//!         .join("resources");
28//!     let config = Config::default();
29//!     let rules = Rules::from_path(resources.join("test.rules")).expect("Could not parse rules");
30//!     let cache: IntelCache<Rule> = rules.into();
31//!     cache.materialize_rules(config.rule_path.clone()).expect("Failed to materialize rules");
32//!
33//!     smol::block_on(async move {
34//!         let mut ids = Ids::new(config).await.expect("Failed to create ids");
35//!         let readers: Vec<EveReader<EveMessage>> = ids.take_readers();
36//!         let readers = futures::stream::select_all(readers.into_iter());
37//!
38//!         let packets: Vec<Packet> = vec![];
39//!         ids.send(packets.as_slice(), 0).expect("Failed to send packets");
40//!
41//!         let alerts: Result<Vec<_>, Error> = readers.try_collect().await;
42//!         let alerts = alerts.expect("Failed to parse alerts");
43//!
44//!         for eve_msgs in alerts {
45//!             for eve in eve_msgs {
46//!                 println!("Eve={:?}", eve);
47//!                 if let Some(intel) = cache.observed(eve) {
48//!                     if let Observed::Alert { rule, message: _, ts: _} = intel {
49//!                         println!("Rule={:?}", rule);
50//!                     }
51//!                 }
52//!             }
53//!         }
54//!     })
55//! }
56//! ```
57#![deny(unused_must_use, unused_imports, bare_trait_objects)]
58pub mod config;
59mod errors;
60mod eve;
61mod intel;
62#[allow(dead_code)]
63#[cfg(feature = "protobuf")]
64mod serde_helpers;
65
66pub mod prelude {
67    pub use super::config::Config;
68    pub use super::errors::Error;
69    pub use super::eve::*;
70    pub use super::intel::{
71        CachedRule, IdsKey, IntelCache, Observable, Observed, Rule, Rules, Tracer,
72    };
73    #[cfg(feature = "protobuf")]
74    pub use super::proto;
75    pub use super::Ids;
76    pub use packet_ipc::AsIpcPacket;
77
78    pub use chrono;
79}
80
81#[cfg(feature = "protobuf")]
82pub(crate) use eve::parse_date_time;
83
84#[allow(missing_docs)]
85#[cfg(feature = "protobuf")]
86pub mod proto {
87    tonic::include_proto!("suricata");
88
89    impl crate::intel::Observable for Eve {
90        fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
91            self.timestamp
92                .clone()
93                .map(|ts| {
94                    let ts = chrono::NaiveDateTime::from_timestamp(ts.seconds, ts.nanos as _);
95                    chrono::DateTime::from_utc(ts, chrono::Utc)
96                })
97                .unwrap_or_else(|| chrono::Utc::now())
98        }
99
100        fn key(&self) -> Option<crate::intel::IdsKey> {
101            self.alert.as_ref().map(|a| crate::intel::IdsKey {
102                gid: a.gid as _,
103                sid: a.signature_id as _,
104            })
105        }
106    }
107}
108
109use crate::config::output::{Output, OutputType};
110use config::Config;
111use log::*;
112use packet_ipc::ConnectedIpc;
113use prelude::*;
114use smol::future::or;
115use smol::io::AsyncBufReadExt;
116use smol::stream::{Stream, StreamExt};
117use smol::Task;
118use std::path::PathBuf;
119use std::time::Duration;
120
121//const READER_BUFFER_SIZE: usize = 128;
122
123pub struct SpawnContext<'a, M> {
124    process: Option<std::process::Child>,
125    awaiting_servers: Vec<Task<Result<packet_ipc::ConnectedIpc<'a>, Error>>>,
126    awaiting_readers: Vec<Task<Result<EveReader<M>, Error>>>,
127}
128
129impl<'a, M: Send + 'static> SpawnContext<'a, M> {
130    fn spawn_suricata(args: &Config) -> Result<std::process::Child, Error> {
131        let mut command = std::process::Command::new(args.exe_path.to_str().unwrap());
132        let server_args: Vec<String> = vec![
133            "-c",
134            args.materialize_config_to.to_str().unwrap(),
135            "--capture-plugin=ipc-plugin",
136        ]
137        .into_iter()
138        .map(String::from)
139        .collect();
140
141        command
142            .args(server_args)
143            .stdin(std::process::Stdio::null())
144            .stderr(std::process::Stdio::piped())
145            .stdout(std::process::Stdio::piped());
146        info!("Spawning {:?}", command);
147        command.spawn().map_err(Error::Io)
148    }
149    /// A stream with stdout/error from suricata combined in a Result<String, String>
150    /// Useful, to watch for completion on startup and to delegate the logging to the caller.
151    fn suricata_output_stream(
152        process: &mut std::process::Child,
153    ) -> impl Stream<Item = Result<Result<String, String>, Error>> {
154        let stdout_complete = {
155            let o = process.stdout.take().unwrap();
156            let o = smol::Unblock::new(o);
157            let reader = smol::io::BufReader::new(o);
158            reader
159                .lines()
160                .map(move |t| match t {
161                    Ok(l) => Ok(Ok(l)),
162                    Err(e) => Err(Error::Io(e)),
163                })
164                .fuse()
165        };
166        let stderr_complete = {
167            let o = process.stderr.take().unwrap();
168            let o = smol::Unblock::new(o);
169            let reader = smol::io::BufReader::new(o);
170            reader
171                .lines()
172                .map(move |t| match t {
173                    Ok(l) => Ok(Err(l)),
174                    Err(e) => Err(Error::Io(e)),
175                })
176                .fuse()
177        };
178        smol::stream::or(stdout_complete, stderr_complete).boxed()
179    }
180
181    ///
182    /// When suricata starts it will want to process rules, before connecting to the ipc sockets or alert sockets.
183    /// During this time it is still possible that suricata may not start, so we expose the SpawnContext along side a
184    /// Stream. The `SpawnContext` Should not be used by you (you jerk). The Stream however, should be watched for completion
185    /// When the Stream completes, you may consider Suricata dead. The streams element is a Result<String, String> representing
186    /// stdout, stderr respectively.
187    ///
188    /// Warning, you MUST consume the `Stream` if you don't Suricata will eventiually lock up.
189    /// If you are unsure about any of this use `Ids::new()`
190    pub fn new(
191        args: &Config,
192    ) -> Result<
193        (
194            SpawnContext<'a, M>,
195            impl Stream<Item = Result<Result<String, String>, Error>>,
196        ),
197        Error,
198    > {
199        if (args.max_pending_packets as usize) < args.ipc_plugin.allocation_batch_size {
200            return Err(Error::Custom {
201                msg: "Max pending packets must be larger than IPC allocation batch".into(),
202            });
203        }
204        //let close_grace_period = args.close_grace_period.clone();
205        let opt_size = args.buffer_size.clone();
206
207        let awaiting_readers: Vec<_> = args
208            .outputs
209            .iter()
210            .flat_map(|c| connect_output::<M>(c, opt_size.clone()))
211            .collect();
212
213        debug!("Readers are listening, starting suricata");
214
215        let (ipc_plugin, servers) = args.ipc_plugin.clone().into_plugin()?;
216        args.materialize(ipc_plugin)?;
217
218        let awaiting_servers: Vec<Task<Result<ConnectedIpc, Error>>> = servers
219            .into_iter()
220            .map(|s| smol::spawn(async move { s.accept().map_err(Error::PacketIpc) }))
221            .collect();
222
223        let mut process = Self::spawn_suricata(&args)?;
224        debug!("Spawn complete");
225
226        let output_streams = Self::suricata_output_stream(&mut process);
227        let context = SpawnContext {
228            process: Some(process),
229            awaiting_servers,
230            awaiting_readers,
231        };
232        debug!("Return stream and ctx");
233        Ok((context, output_streams))
234    }
235}
236
237impl<'a, T> Drop for SpawnContext<'a, T> {
238    fn drop(&mut self) {
239        let process = match std::mem::replace(&mut self.process, None) {
240            Some(process) => process,
241            None => return,
242        };
243        let pid = process.id() as _;
244        // Dont mess around!
245        unsafe { libc::kill(pid, libc::SIGKILL) };
246    }
247}
248
249pub struct Ids<'a, T> {
250    close_grace_period: Option<Duration>,
251    readers: Vec<EveReader<T>>,
252    process: Option<std::process::Child>,
253    ipc_servers: Vec<packet_ipc::ConnectedIpc<'a>>,
254}
255
256unsafe impl<'a, T> Send for Ids<'a, T> {}
257unsafe impl<'a, T> Sync for Ids<'a, T> {}
258
259impl<'a, T> Drop for Ids<'a, T> {
260    fn drop(&mut self) {
261        let _ = self.close();
262
263        let mut process = match std::mem::replace(&mut self.process, None) {
264            Some(process) => process,
265            None => return,
266        };
267
268        // Attempt to close nicely
269        let pid = process.id() as _;
270        unsafe { libc::kill(pid, libc::SIGTERM) };
271
272        if let Some(close_grace_period) = self.close_grace_period {
273            smol::block_on(or(
274                smol::unblock(move || {
275                    if let Err(e) = process.wait() {
276                        error!(
277                            "Unexpected error while waiting on suricata process: {:?}",
278                            e
279                        );
280                    }
281                }),
282                async move {
283                    // If process doesn't end during grace period, send it a sigkill
284                    smol::Timer::after(close_grace_period).await;
285                    // We already have a mutable borrow in process.wait(), send signal to pid
286                    unsafe { libc::kill(pid, libc::SIGKILL) };
287                },
288            ));
289        } else if let Err(e) = process.kill() {
290            error!("Failed to stop suricata process: {:?}", e);
291        }
292    }
293}
294
295impl<'a, M> Ids<'a, M> {
296    pub fn send<'b, T: AsIpcPacket + 'a>(
297        &'a self,
298        packets: &'b [T],
299        server_id: usize,
300    ) -> Result<usize, Error> {
301        if let Some(ipc_server) = self.ipc_servers.get(server_id) {
302            let packets_sent = packets.len();
303            ipc_server.send(packets).map_err(Error::PacketIpc)?;
304            Ok(packets_sent)
305        } else {
306            Err(Error::Custom {
307                msg: "Cannot send when Ids already closed.".to_string(),
308            })
309        }
310    }
311
312    pub fn close(&mut self) -> Result<(), Error> {
313        for mut server in self.ipc_servers.drain(..) {
314            server.close().map_err(Error::PacketIpc)?
315        }
316        Ok(())
317    }
318
319    pub fn take_readers(&mut self) -> Vec<EveReader<M>> {
320        std::mem::replace(&mut self.readers, vec![])
321    }
322
323    pub fn reload_rules(&self) -> bool {
324        if let Some(ref p) = self.process {
325            unsafe { libc::kill(p.id() as _, libc::SIGUSR2) == 0 }
326        } else {
327            false
328        }
329    }
330
331    pub async fn new_with_spawn_context(
332        args: Config,
333        mut spawn_context: SpawnContext<'a, M>,
334    ) -> Result<Ids<'a, M>, Error> {
335        if (args.max_pending_packets as usize) < args.ipc_plugin.allocation_batch_size {
336            return Err(Error::Custom {
337                msg: "Max pending packets must be larger than IPC allocation batch".into(),
338            });
339        }
340        let close_grace_period = args.close_grace_period.clone();
341
342        let pending_ipc_connections = std::mem::take(&mut spawn_context.awaiting_servers);
343        let awaiting_readers = std::mem::take(&mut spawn_context.awaiting_readers);
344
345        let connected_ipcs = async move {
346            let mut ipcs = Vec::with_capacity(pending_ipc_connections.len());
347            for ipc in pending_ipc_connections {
348                ipcs.push(ipc.await);
349            }
350            let ipcs: Result<Vec<_>, _> = ipcs.into_iter().collect();
351            ipcs
352        }
353        .await?;
354
355        debug!("IPC Connection formed");
356
357        let readers = async move {
358            let mut readers = Vec::with_capacity(awaiting_readers.len());
359            for connection in awaiting_readers {
360                readers.push(connection.await);
361            }
362            let readers: Result<Vec<_>, _> = readers.into_iter().collect();
363            readers
364        }
365        .await?;
366
367        debug!("Eve readers formed.");
368
369        if !readers.is_empty() {
370            debug!("{} Eve Readers connected", readers.len());
371        }
372
373        Ok(Ids {
374            close_grace_period: close_grace_period,
375            readers: readers,
376            process: (&mut spawn_context).process.take(),
377            ipc_servers: connected_ipcs,
378        })
379    }
380
381    pub async fn new(args: Config) -> Result<Ids<'a, M>, Error>
382    where
383        M: Send + 'static,
384    {
385        let (spawn_ctx, stdout_stream) = SpawnContext::new(&args)?;
386        let pid: u32 = spawn_ctx
387            .process
388            .as_ref()
389            .map(|p| p.id())
390            .ok_or(Error::Custom {
391                msg: String::from("Missing process."),
392            })?;
393
394        let stdout_fut = stdout_stream.for_each(move |r| match r {
395            Err(io) => {
396                error!("Fatal io Error ({}) {:?}", pid, io)
397            }
398            Ok(Ok(line)) => {
399                debug!("[Suricata ({})] {}", pid, line);
400            }
401            Ok(Err(line)) => {
402                error!("[Suricata ({})] {}", pid, line);
403            }
404        });
405        smol::spawn(stdout_fut).detach();
406
407        info!("SpawnContext created");
408
409        Self::new_with_spawn_context(args, spawn_ctx).await
410    }
411}
412
413fn connect_output<M: Send + 'static>(
414    output: &Box<dyn Output + Send + Sync>,
415    opt_size: Option<usize>,
416) -> Option<smol::Task<Result<EveReader<M>, Error>>> {
417    if let Some(path) = output.eve().listener(&output.output_type()) {
418        let r = match connect_uds(path, output.output_type().clone(), opt_size) {
419            Err(e) => smol::spawn(async move { Err(e) }),
420            Ok(t) => t,
421        };
422        Some(r)
423    } else {
424        None
425    }
426}
427
428fn connect_uds<M: Send + 'static>(
429    path: PathBuf,
430    output_type: OutputType,
431    opt_size: Option<usize>,
432) -> Result<smol::Task<Result<EveReader<M>, Error>>, Error> {
433    debug!(
434        "Spawning acceptor for uds connection from suricata for {:?}",
435        path
436    );
437    if path.exists() {
438        std::fs::remove_file(&path)?;
439    }
440    debug!("Listening to {:?} for event type {:?}", path, output_type);
441    let listener = std::os::unix::net::UnixListener::bind(path.clone()).map_err(Error::from)?;
442    let r = match smol::Async::new(listener).map_err(Error::from) {
443        Err(e) => smol::spawn(async move { Err(e) }),
444        Ok(listener) => smol::spawn(async move {
445            listener.accept().await.map_err(Error::from).map(|t| {
446                let (uds_connection, uds_addr) = t;
447
448                debug!("UDS connection formed from {:?}", uds_addr);
449
450                if let Some(sz) = opt_size {
451                    EveReader::with_capacity(path, output_type, uds_connection, sz)
452                } else {
453                    EveReader::new(path, output_type, uds_connection)
454                }
455            })
456        }),
457    };
458    Ok(r)
459}