1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
//! Channel-based event I/O processors.
//!
//! This crate allows to collect events from text files, pcap files, and Apache
//! Kafka servers, distribute them to multiple threads, and optionally collect
//! them to send to Kafka.

pub mod fluentd;
pub mod kafka;
pub mod mbox;
pub mod pcap;
mod pipeline;
pub mod text;

pub use pipeline::split;
use std::error;
use std::fmt;

/// A trait for a data source that produces messages of type `Data`.
pub trait Input {
    type Data;
    type Ack;

    /// Fetches events and send them as `Data`. It also receives and processes
    /// `Ack`, which acknowledges the receipt of a certain `Data`.
    fn run(self) -> Result<(), Error>;
}

/// A trait for a single event from any type of data source.
pub trait Event {
    type Ack;

    fn raw(&self) -> &[u8];
    fn time(&self) -> u64;
    fn ack(&self) -> Self::Ack;
}

/// A raw event as a byte sequence.
#[derive(Debug)]
pub struct BareEvent {
    pub raw: Vec<u8>,
    pub seq_no: u64,
}

impl Event for BareEvent {
    type Ack = u64;

    #[must_use]
    fn raw(&self) -> &[u8] {
        self.raw.as_slice()
    }

    #[must_use]
    fn time(&self) -> u64 {
        self.seq_no
    }

    #[must_use]
    fn ack(&self) -> Self::Ack {
        self.seq_no
    }
}

/// The error type for event I/O operations.
#[derive(Debug)]
pub enum Error {
    /// The data channel was closed.
    ChannelClosed,
    /// Cannot commit consumed events to the source.
    CannotCommit(Box<dyn error::Error>),
    /// Cannot fetch events from the source.
    CannotFetch(Box<dyn error::Error>),
    /// Cannot parse a message.
    InvalidMessage(Box<dyn error::Error>),
    /// An internal error that should not occur.
    Fatal(String),
    /// Too many events to handle.
    TooManyEvents(usize),
}

impl error::Error for Error {
    #[must_use]
    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
        match self {
            Self::CannotCommit(e) | Self::CannotFetch(e) | Self::InvalidMessage(e) => {
                Some(e.as_ref())
            }
            _ => None,
        }
    }
}

impl fmt::Display for Error {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::ChannelClosed => write!(f, "data channel closed"),
            Self::CannotCommit(e) => write!(f, "cannot commit to Kafka: {}", e),
            Self::CannotFetch(e) => write!(f, "cannot fetch message from Kafka: {}", e),
            Self::InvalidMessage(e) => write!(f, "invalid MessagePack format: {}", e),
            Self::Fatal(s) => write!(f, "fatal error: {}", s),
            Self::TooManyEvents(n) => write!(
                f,
                "cannot handle {} events (expected < {})",
                n,
                u32::max_value()
            ),
        }
    }
}