fiddler/
lib.rs

1//! Fast and flexible data stream processor written in Rust
2//!
3//! Provides a library for building data streaming pipelines using a
4//! declaritive yaml based configuration for data aggregation and
5//! transformation
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use serde_yaml::Value;
9use std::collections::HashMap;
10use thiserror::Error;
11use tokio::sync::oneshot;
12use tokio::time::Duration;
13
14/// Contains configuration and module registration primitives for module development
15pub mod config;
16pub use runtime::Runtime;
17pub(crate) mod modules;
18mod runtime;
19
20/// BatchingPolicy defines common configuration items for used in batching operations
21/// such as OutputBatch modules
22#[derive(Deserialize, Default, Clone)]
23pub struct BatchingPolicy {
24    duration: Option<Duration>,
25    size: Option<usize>,
26}
27
28/// MessageType is utilized by plugins to identiy which type of message are they sending
29/// to the runtime.  [MessageType::Default] is utilized for processing data that will be
30/// sent to their configured outputs.  [MessageType::BeginStream] and [MessageType::EndStream]
31/// will not be processed by the pipeline but are utilized to logically group messages
32/// together under a shared callback function.
33#[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)]
34pub enum MessageType {
35    #[default]
36    /// Default message to be processed
37    Default,
38    /// Received from Input modules that indicates the start of an event stream with a shared callback
39    /// This event is used for tracking state only and will not be processed
40    BeginStream(String),
41    /// Received from Input modules that indicates the end of an event stream with a shared callback
42    /// This event is used for tracking state only and will not be processed
43    EndStream(String),
44}
45
46/// Message is the uniform struct utilized within all modules of fiddler.
47/// ```
48/// # use fiddler::Message;
49/// # use std::collections::HashMap;
50/// let content = "This is a message being processed";
51/// let message = Message{
52///     bytes: content.as_bytes().into(),
53///     ..Default::default()
54/// };
55/// ```
56#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
57pub struct Message {
58    /// raw bytes of of the message to be collected, processed, and sent
59    pub bytes: Vec<u8>,
60    /// metadata about the message
61    pub metadata: HashMap<String, Value>,
62    /// Specified message types.  If [MessageType::Parent] is provided, no processing of the event
63    /// will take place, but the callback will be called when all child messages
64    /// have been processed.  This gives modules the ability to tier callback actions.  Such as a
65    /// module that receives a file from queue and then processes all the lines in said file.
66    /// Each line will be it's own [Message]; where the parent callback will delete the message
67    /// once all lines are processed
68    pub message_type: MessageType,
69    /// [Optional] Specifies the stream_id of the associated stream of messages with a shared callback
70    pub stream_id: Option<String>,
71}
72
73/// Type alias for Vec<[crate::Message]>, a grouping of Messages being produced
74pub type MessageBatch = Vec<Message>;
75
76/// Acknowledgement channel utilized to send feedback to input module on the successful
77/// or unsuccessful processing of event emited by input.
78pub type CallbackChan = oneshot::Sender<Status>;
79
80/// Helper function to generate transmitting and receiver pair that can be utilized for
81/// the [crate::CallbackChan] for input modules.
82pub fn new_callback_chan() -> (oneshot::Sender<Status>, oneshot::Receiver<Status>) {
83    oneshot::channel()
84}
85
86/// Status returned through the [crate::CallbackChan] to input modules
87#[derive(Clone, Debug)]
88pub enum Status {
89    /// Fully successful processed message
90    Processed,
91    /// Processing failed partially, or fully with a list of failures received.
92    Errored(Vec<String>),
93}
94
95/// Closer trait utilized by input and output modules to optionally gracefully
96/// exit upon successful processing of the pipeline
97#[async_trait]
98pub trait Closer {
99    /// gracefully terminate resources prior to shutdown of processing pipeline
100    async fn close(&mut self) -> Result<(), Error> {
101        Ok(())
102    }
103}
104
105/// Input module trait to insert [crate::Message] into the processing pipeline.
106#[async_trait]
107pub trait Input: Closer {
108    /// Read single message from the input module and expected return a tuple
109    /// containing the [crate::Message] and a [crate::CallbackChan] for reporting status back.
110    async fn read(&mut self) -> Result<(Message, Option<CallbackChan>), Error>;
111}
112
113/// BatchInput module trait to insert one to may [crate::Message] into the pipeline.
114/// InputBatch is currently not yet introduced into the runtime.
115#[async_trait]
116pub trait InputBatch: Closer {
117    /// Read multiple messages from the input module and expected return a tuple
118    /// containing the [crate::MessageBatch] and a [crate::CallbackChan] for reporting status back.
119    fn read_batch(&mut self) -> Result<(MessageBatch, Option<CallbackChan>), Error>;
120}
121
122/// Output module trait to write a single [crate::Message] to the output
123#[async_trait]
124pub trait Output: Closer {
125    /// writes message to the output module
126    async fn write(&mut self, message: Message) -> Result<(), Error>;
127}
128
129/// Batching output module utilized to write many messages to the output
130/// based on batch_size and provided interval.  Defaults are `batch_size: 500`, `interval: 10 seconds`.
131/// The queuing mechanism is provided by the runtime and will call `write_batch` if the batch size
132/// has been reached, or the desired interval has passed, whichever comes first.
133#[async_trait]
134pub trait OutputBatch: Closer {
135    /// Write [crate::MessageBatch] to the output in accordance with the provided batching policy
136    async fn write_batch(&mut self, message_batch: MessageBatch) -> Result<(), Error>;
137
138    /// returns the desired size of the [crate::MessageBatch] to provide the the output module
139    async fn batch_size(&self) -> usize {
140        500
141    }
142
143    /// returns the duration of how long the runtime should wait between sending [crate::MessageBatch] to
144    /// the output.
145    async fn interval(&self) -> Duration {
146        Duration::from_secs(10)
147    }
148}
149
150/// Processor is the processing module trait utilized to accept a single [crate::Message] and provide
151/// one to many additional messages through [crate::MessageBatch]
152#[async_trait]
153pub trait Processor: Closer {
154    /// process a givent [crate::Message] and return the transformed, one to many messages to continue
155    /// on the pipeline.
156    async fn process(&self, message: Message) -> Result<MessageBatch, Error>;
157}
158
159/// Enum to capture errors occured through the pipeline
160#[derive(Debug, Error)]
161pub enum Error {
162    /// Yaml parsing errors found within the declarative language proved
163    #[error("UnableToSerializeObject: {0}")]
164    UnableToSerializeYamlObject(#[from] serde_yaml::Error),
165
166    /// JSON serialization is primarily utilized as a preparser to passing the declarative
167    /// language to the given module by utilizing jsonschema to validate the input.  This is unlikely
168    /// to occur in practice since the yaml configuration object is converted to json for this step.
169    #[error("UnableToSerializeObject: {0}")]
170    UnableToSerializeJsonObject(#[from] serde_json::Error),
171
172    /// Validation errors discovered by the jsonschema evaluation of the declarative configuration language
173    /// provided to a given module
174    #[error("ValidationError: {0}")]
175    Validation(String),
176
177    /// Error with the processing pipeline due to a failure of internal libraries or utilized modules
178    #[error("ExecutionError: {0}")]
179    ExecutionError(String),
180
181    /// EndOfInput is a error enum variant to indicate that the input module has finished and will not
182    /// receive additional input.  This error triggers a graceful shutdown of the processing pipeline
183    #[error("EndOfInput")]
184    EndOfInput,
185
186    /// Unable to secure internal mutex lock
187    #[error("InternalServerError")]
188    UnableToSecureLock,
189
190    /// A plugin of the same category (input, processing, output) has already been provided
191    #[error("DuplicateRegisteredName: {0}")]
192    DuplicateRegisteredName(String),
193
194    /// The provided jsonschema configuration for a module in incorrect
195    #[error("InvalidValidationSchema: {0}")]
196    InvalidValidationSchema(String),
197
198    /// Configuration provided to a module is invalid
199    #[error("ConfigurationValidationFailed: {0}")]
200    ConfigFailedValidation(String),
201
202    /// Module is not registered with the runtime.
203    #[error("ConfigurationItemNotFound: {0}")]
204    ConfigurationItemNotFound(String),
205
206    /// Not yet implemented functionality
207    #[error("NotYetImplemented")]
208    NotYetImplemented,
209
210    /// Failure to send to an internal channel processing [crate::Message]s
211    #[error("PipelineProcessingError: {0}")]
212    UnableToSendToChannel(String),
213
214    /// Failure to receive from internal raw channel
215    #[error("ChannelRecvError: {0}")]
216    RecvChannelError(#[from] flume::RecvError),
217
218    /// Processing module failed with an unrecoverable error.  Processing of [crate::Message] is stopped and
219    /// [crate::Status] is returned to the input module once all messages in this lineage have been processed
220    #[error("ProcessorFailure: {0}")]
221    ProcessingError(String),
222
223    /// Conditional check has failed for [crate::Message], such as use with [crate::modules::processors::switch]
224    /// conditions
225    #[error("ConditionalCheckfailed")]
226    ConditionalCheckfailed,
227
228    /// Error encountered while calling [crate::Input::read] on an input module
229    #[error("InputError: {0}")]
230    InputError(String),
231
232    /// Error encountered while calling [crate::Output::write] or [crate::OutputBatch::write_batch] on an output module
233    #[error("OutputError: {0}")]
234    OutputError(String),
235
236    /// Error returned by input module to indicate there are no messages to process
237    #[error("NoInputToReturn")]
238    NoInputToReturn,
239}