fiddler/lib.rs
1//! Fast and flexible data stream processor written in Rust
2//!
3//! Provides a library for building data streaming pipelines using a
4//! declaritive yaml based configuration for data aggregation and
5//! transformation
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use serde_yaml::Value;
9use std::collections::HashMap;
10use thiserror::Error;
11use tokio::sync::oneshot;
12use tokio::time::Duration;
13
14/// Contains configuration and module registration primitives for module development
15pub mod config;
16pub use runtime::Runtime;
17pub(crate) mod modules;
18mod runtime;
19
20/// BatchingPolicy defines common configuration items for used in batching operations
21/// such as OutputBatch modules
22#[derive(Deserialize, Default, Clone)]
23pub struct BatchingPolicy {
24 duration: Option<Duration>,
25 size: Option<usize>,
26}
27
28/// MessageType is utilized by plugins to identiy which type of message are they sending
29/// to the runtime. [MessageType::Default] is utilized for processing data that will be
30/// sent to their configured outputs. [MessageType::BeginStream] and [MessageType::EndStream]
31/// will not be processed by the pipeline but are utilized to logically group messages
32/// together under a shared callback function.
33#[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)]
34pub enum MessageType {
35 #[default]
36 /// Default message to be processed
37 Default,
38 /// Received from Input modules that indicates the start of an event stream with a shared callback
39 /// This event is used for tracking state only and will not be processed
40 BeginStream(String),
41 /// Received from Input modules that indicates the end of an event stream with a shared callback
42 /// This event is used for tracking state only and will not be processed
43 EndStream(String),
44}
45
46/// Message is the uniform struct utilized within all modules of fiddler.
47/// ```
48/// # use fiddler::Message;
49/// # use std::collections::HashMap;
50/// let content = "This is a message being processed";
51/// let message = Message{
52/// bytes: content.as_bytes().into(),
53/// ..Default::default()
54/// };
55/// ```
56#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
57pub struct Message {
58 /// raw bytes of of the message to be collected, processed, and sent
59 pub bytes: Vec<u8>,
60 /// metadata about the message
61 pub metadata: HashMap<String, Value>,
62 /// Specified message types. If [MessageType::Parent] is provided, no processing of the event
63 /// will take place, but the callback will be called when all child messages
64 /// have been processed. This gives modules the ability to tier callback actions. Such as a
65 /// module that receives a file from queue and then processes all the lines in said file.
66 /// Each line will be it's own [Message]; where the parent callback will delete the message
67 /// once all lines are processed
68 pub message_type: MessageType,
69 /// [Optional] Specifies the stream_id of the associated stream of messages with a shared callback
70 pub stream_id: Option<String>,
71}
72
73/// Type alias for Vec<[crate::Message]>, a grouping of Messages being produced
74pub type MessageBatch = Vec<Message>;
75
76/// Acknowledgement channel utilized to send feedback to input module on the successful
77/// or unsuccessful processing of event emited by input.
78pub type CallbackChan = oneshot::Sender<Status>;
79
80/// Helper function to generate transmitting and receiver pair that can be utilized for
81/// the [crate::CallbackChan] for input modules.
82pub fn new_callback_chan() -> (oneshot::Sender<Status>, oneshot::Receiver<Status>) {
83 oneshot::channel()
84}
85
86/// Status returned through the [crate::CallbackChan] to input modules
87#[derive(Clone, Debug)]
88pub enum Status {
89 /// Fully successful processed message
90 Processed,
91 /// Processing failed partially, or fully with a list of failures received.
92 Errored(Vec<String>),
93}
94
95/// Closer trait utilized by input and output modules to optionally gracefully
96/// exit upon successful processing of the pipeline
97#[async_trait]
98pub trait Closer {
99 /// gracefully terminate resources prior to shutdown of processing pipeline
100 async fn close(&mut self) -> Result<(), Error> {
101 Ok(())
102 }
103}
104
105/// Input module trait to insert [crate::Message] into the processing pipeline.
106#[async_trait]
107pub trait Input: Closer {
108 /// Read single message from the input module and expected return a tuple
109 /// containing the [crate::Message] and a [crate::CallbackChan] for reporting status back.
110 async fn read(&mut self) -> Result<(Message, Option<CallbackChan>), Error>;
111}
112
113/// BatchInput module trait to insert one to may [crate::Message] into the pipeline.
114/// InputBatch is currently not yet introduced into the runtime.
115#[async_trait]
116pub trait InputBatch: Closer {
117 /// Read multiple messages from the input module and expected return a tuple
118 /// containing the [crate::MessageBatch] and a [crate::CallbackChan] for reporting status back.
119 fn read_batch(&mut self) -> Result<(MessageBatch, Option<CallbackChan>), Error>;
120}
121
122/// Output module trait to write a single [crate::Message] to the output
123#[async_trait]
124pub trait Output: Closer {
125 /// writes message to the output module
126 async fn write(&mut self, message: Message) -> Result<(), Error>;
127}
128
129/// Batching output module utilized to write many messages to the output
130/// based on batch_size and provided interval. Defaults are `batch_size: 500`, `interval: 10 seconds`.
131/// The queuing mechanism is provided by the runtime and will call `write_batch` if the batch size
132/// has been reached, or the desired interval has passed, whichever comes first.
133#[async_trait]
134pub trait OutputBatch: Closer {
135 /// Write [crate::MessageBatch] to the output in accordance with the provided batching policy
136 async fn write_batch(&mut self, message_batch: MessageBatch) -> Result<(), Error>;
137
138 /// returns the desired size of the [crate::MessageBatch] to provide the the output module
139 async fn batch_size(&self) -> usize {
140 500
141 }
142
143 /// returns the duration of how long the runtime should wait between sending [crate::MessageBatch] to
144 /// the output.
145 async fn interval(&self) -> Duration {
146 Duration::from_secs(10)
147 }
148}
149
150/// Processor is the processing module trait utilized to accept a single [crate::Message] and provide
151/// one to many additional messages through [crate::MessageBatch]
152#[async_trait]
153pub trait Processor: Closer {
154 /// process a givent [crate::Message] and return the transformed, one to many messages to continue
155 /// on the pipeline.
156 async fn process(&self, message: Message) -> Result<MessageBatch, Error>;
157}
158
159/// Enum to capture errors occured through the pipeline
160#[derive(Debug, Error)]
161pub enum Error {
162 /// Yaml parsing errors found within the declarative language proved
163 #[error("UnableToSerializeObject: {0}")]
164 UnableToSerializeYamlObject(#[from] serde_yaml::Error),
165
166 /// JSON serialization is primarily utilized as a preparser to passing the declarative
167 /// language to the given module by utilizing jsonschema to validate the input. This is unlikely
168 /// to occur in practice since the yaml configuration object is converted to json for this step.
169 #[error("UnableToSerializeObject: {0}")]
170 UnableToSerializeJsonObject(#[from] serde_json::Error),
171
172 /// Validation errors discovered by the jsonschema evaluation of the declarative configuration language
173 /// provided to a given module
174 #[error("ValidationError: {0}")]
175 Validation(String),
176
177 /// Error with the processing pipeline due to a failure of internal libraries or utilized modules
178 #[error("ExecutionError: {0}")]
179 ExecutionError(String),
180
181 /// EndOfInput is a error enum variant to indicate that the input module has finished and will not
182 /// receive additional input. This error triggers a graceful shutdown of the processing pipeline
183 #[error("EndOfInput")]
184 EndOfInput,
185
186 /// Unable to secure internal mutex lock
187 #[error("InternalServerError")]
188 UnableToSecureLock,
189
190 /// A plugin of the same category (input, processing, output) has already been provided
191 #[error("DuplicateRegisteredName: {0}")]
192 DuplicateRegisteredName(String),
193
194 /// The provided jsonschema configuration for a module in incorrect
195 #[error("InvalidValidationSchema: {0}")]
196 InvalidValidationSchema(String),
197
198 /// Configuration provided to a module is invalid
199 #[error("ConfigurationValidationFailed: {0}")]
200 ConfigFailedValidation(String),
201
202 /// Module is not registered with the runtime.
203 #[error("ConfigurationItemNotFound: {0}")]
204 ConfigurationItemNotFound(String),
205
206 /// Not yet implemented functionality
207 #[error("NotYetImplemented")]
208 NotYetImplemented,
209
210 /// Failure to send to an internal channel processing [crate::Message]s
211 #[error("PipelineProcessingError: {0}")]
212 UnableToSendToChannel(String),
213
214 /// Failure to receive from internal raw channel
215 #[error("ChannelRecvError: {0}")]
216 RecvChannelError(#[from] flume::RecvError),
217
218 /// Processing module failed with an unrecoverable error. Processing of [crate::Message] is stopped and
219 /// [crate::Status] is returned to the input module once all messages in this lineage have been processed
220 #[error("ProcessorFailure: {0}")]
221 ProcessingError(String),
222
223 /// Conditional check has failed for [crate::Message], such as use with [crate::modules::processors::switch]
224 /// conditions
225 #[error("ConditionalCheckfailed")]
226 ConditionalCheckfailed,
227
228 /// Error encountered while calling [crate::Input::read] on an input module
229 #[error("InputError: {0}")]
230 InputError(String),
231
232 /// Error encountered while calling [crate::Output::write] or [crate::OutputBatch::write_batch] on an output module
233 #[error("OutputError: {0}")]
234 OutputError(String),
235
236 /// Error returned by input module to indicate there are no messages to process
237 #[error("NoInputToReturn")]
238 NoInputToReturn,
239}