1pub mod fluentd;
8#[cfg(feature = "kafka")]
9pub mod kafka;
10pub mod mbox;
11#[cfg(feature = "ndarray")]
12pub mod ndarray;
13#[cfg(feature = "pcap")]
14pub mod pcap;
15mod pipeline;
16pub mod text;
17
18use std::error;
19use std::fmt;
20
21pub use self::pipeline::split;
22
23pub trait Input {
25 type Data;
26 type Ack;
27
28 fn run(self) -> Result<(), Error>;
36}
37
38pub type SeqNo = usize;
39
40pub trait Event {
42 type Ack;
43
44 fn raw(&self) -> &[u8];
45 fn time(&self) -> SeqNo;
46 fn ack(&self) -> Self::Ack;
47}
48
49#[derive(Debug)]
51pub struct BareEvent {
52 pub raw: Vec<u8>,
53 pub seq_no: SeqNo,
54}
55
56impl Event for BareEvent {
57 type Ack = SeqNo;
58
59 fn raw(&self) -> &[u8] {
60 self.raw.as_slice()
61 }
62
63 fn time(&self) -> SeqNo {
64 self.seq_no
65 }
66
67 fn ack(&self) -> Self::Ack {
68 self.seq_no
69 }
70}
71
72#[derive(Debug)]
74pub enum Error {
75 ChannelClosed,
77 CannotCommit(Box<dyn error::Error>),
79 CannotFetch(Box<dyn error::Error>),
81 InvalidMessage(Box<dyn error::Error>),
83 Fatal(String),
85 TooManyEvents(usize),
87}
88
89impl error::Error for Error {
90 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
91 match self {
92 Self::CannotCommit(e) | Self::CannotFetch(e) | Self::InvalidMessage(e) => {
93 Some(e.as_ref())
94 }
95 _ => None,
96 }
97 }
98}
99
100impl fmt::Display for Error {
101 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102 match self {
103 Self::ChannelClosed => write!(f, "data channel closed"),
104 Self::CannotCommit(e) => write!(f, "cannot commit to Kafka: {e}"),
105 Self::CannotFetch(e) => write!(f, "cannot fetch message from Kafka: {e}"),
106 Self::InvalidMessage(e) => write!(f, "invalid MessagePack format: {e}"),
107 Self::Fatal(s) => write!(f, "fatal error: {s}"),
108 Self::TooManyEvents(n) => {
109 write!(f, "cannot handle {n} events (expected < {})", u32::MAX)
110 }
111 }
112 }
113}