eventio/
lib.rs

1//! Channel-based event I/O processors.
2//!
3//! This crate allows to collect events from text files, pcap files, and Apache
4//! Kafka servers, distribute them to multiple threads, and optionally collect
5//! them to send to Kafka.
6
7pub mod fluentd;
8#[cfg(feature = "kafka")]
9pub mod kafka;
10pub mod mbox;
11#[cfg(feature = "ndarray")]
12pub mod ndarray;
13#[cfg(feature = "pcap")]
14pub mod pcap;
15mod pipeline;
16pub mod text;
17
18use std::error;
19use std::fmt;
20
21pub use self::pipeline::split;
22
23/// A trait for a data source that produces messages of type `Data`.
24pub trait Input {
25    type Data;
26    type Ack;
27
28    /// Fetches events and send them as `Data`. It also receives and processes
29    /// `Ack`, which acknowledges the receipt of a certain `Data`.
30    ///
31    /// # Errors
32    ///
33    /// Returns an error if it fails to fetch events, or receives an invalid
34    /// `Data` or `Ack`.
35    fn run(self) -> Result<(), Error>;
36}
37
38pub type SeqNo = usize;
39
40/// A trait for a single event from any type of data source.
41pub trait Event {
42    type Ack;
43
44    fn raw(&self) -> &[u8];
45    fn time(&self) -> SeqNo;
46    fn ack(&self) -> Self::Ack;
47}
48
49/// A raw event as a byte sequence.
50#[derive(Debug)]
51pub struct BareEvent {
52    pub raw: Vec<u8>,
53    pub seq_no: SeqNo,
54}
55
56impl Event for BareEvent {
57    type Ack = SeqNo;
58
59    fn raw(&self) -> &[u8] {
60        self.raw.as_slice()
61    }
62
63    fn time(&self) -> SeqNo {
64        self.seq_no
65    }
66
67    fn ack(&self) -> Self::Ack {
68        self.seq_no
69    }
70}
71
72/// The error type for event I/O operations.
73#[derive(Debug)]
74pub enum Error {
75    /// The data channel was closed.
76    ChannelClosed,
77    /// Cannot commit consumed events to the source.
78    CannotCommit(Box<dyn error::Error>),
79    /// Cannot fetch events from the source.
80    CannotFetch(Box<dyn error::Error>),
81    /// Cannot parse a message.
82    InvalidMessage(Box<dyn error::Error>),
83    /// An internal error that should not occur.
84    Fatal(String),
85    /// Too many events to handle.
86    TooManyEvents(usize),
87}
88
89impl error::Error for Error {
90    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
91        match self {
92            Self::CannotCommit(e) | Self::CannotFetch(e) | Self::InvalidMessage(e) => {
93                Some(e.as_ref())
94            }
95            _ => None,
96        }
97    }
98}
99
100impl fmt::Display for Error {
101    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102        match self {
103            Self::ChannelClosed => write!(f, "data channel closed"),
104            Self::CannotCommit(e) => write!(f, "cannot commit to Kafka: {e}"),
105            Self::CannotFetch(e) => write!(f, "cannot fetch message from Kafka: {e}"),
106            Self::InvalidMessage(e) => write!(f, "invalid MessagePack format: {e}"),
107            Self::Fatal(s) => write!(f, "fatal error: {s}"),
108            Self::TooManyEvents(n) => {
109                write!(f, "cannot handle {n} events (expected < {})", u32::MAX)
110            }
111        }
112    }
113}